Skip to main content

异步编程

info

异步编程是 Python 3.5+ 引入的重要特性,通过 async/await 语法提供了一种优雅的方式来处理并发操作。它特别适合 I/O 密集型任务,如网络请求、数据库查询等,可以显著提高程序性能。

# 传统同步方式 - 总耗时 = 任务1 + 任务2 + 任务3
# 异步方式 - 总耗时 ≈ max(任务1, 任务2, 任务3)

async def main():
# 三个网络请求可以并发执行
results = await asyncio.gather(
fetch_data('url1'),
fetch_data('url2'),
fetch_data('url3')
)

PEP 492 – 协程与 async/await 语法

异步编程

异步编程是一种编程范式,它允许程序在等待某些操作(如 I/O)完成时继续执行其他任务,而不是阻塞等待。

异步编程的优势:

  • 高效并发:同时处理多个 I/O 操作,不需要多线程
  • 资源节省:避免了线程切换的开销
  • 代码优雅:使用 async/await 语法,避免回调地狱
  • 性能提升:在 I/O 密集型任务中性能显著提升

适用场景:

  • 网络请求(API 调用、爬虫)
  • 数据库查询
  • 文件 I/O 操作
  • WebSocket 通信
warning

异步编程不适合 CPU 密集型任务。对于计算密集型操作,应该使用多进程(multiprocessing)。

async 和 await 关键字

async 关键字用于定义协程函数,await 关键字用于等待协程执行完成。

async 协程函数不能直接调用,最终都需要通过asyncio标准库调用。

import asyncio

async def hello():
await asyncio.sleep(1)
return "Hello, World!"

# 在另一个 async 函数内部使用 await 调用协程函数
async def main():
"""但最终还是需要回到asyncio.run()中执行"""
result = await hello()
print(result)
# 方式1: asyncio.run() - Python 3.7+ 推荐方式
asyncio.run(main())

# 方式2: asyncio.create_task() - 在运行的事件循环中创建任务
async def create_tasks():
task = asyncio.create_task(hello()) # 创建任务
# 可以在等待之前做其他事情
print("任务已创建,继续做其他事情...")
result = await task # 执行任务,并等待任务完成
print(result)

asyncio.run(create_tasks())

# 方式3: asyncio.gather() - 并发执行多个协程
async def run_multiple():
results = await asyncio.gather(hello(), hello(), hello())
print(results) # ['Hello, World!', 'Hello, World!', 'Hello, World!']

asyncio.run(run_multiple())

# 方式4: 手动管理事件循环
loop = asyncio.get_event_loop()
try:
result = loop.run_until_complete(hello())
print(result)
finally:
loop.close()

async for

异步迭代器使用 async for 语法,适用于需要异步获取数据的场景。

import asyncio

class AsyncRange:
"""异步范围迭代器"""
def __init__(self, start, end):
self.current = start
self.end = end

def __aiter__(self):
return self

async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.5) # 模拟异步操作
self.current += 1
return self.current - 1

async def main():
# 使用 async for 迭代
async for num in AsyncRange(0, 5):
print(f"获取数字: {num}")

asyncio.run(main())
# 输出:
# 获取数字: 0
# 获取数字: 1
# 获取数字: 2
# 获取数字: 3
# 获取数字: 4

async with

异步上下文管理器使用 async with 语法,用于管理异步资源。

import asyncio

class AsyncResource:
"""异步资源管理器"""
async def __aenter__(self):
print("获取资源...")
await asyncio.sleep(1)
print("资源已获取")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源...")
await asyncio.sleep(1)
print("资源已释放")

async def main():
async with AsyncResource() as resource:
print("使用资源")
await asyncio.sleep(0.5)

asyncio.run(main())
# 输出:
# 获取资源...
# 资源已获取
# 使用资源
# 释放资源...
# 资源已释放

内置函数

aiter 函数和 anext 函数

aiter()anext() 是异步版本的 iter()next(),用于处理异步迭代器。

函数签名:

  • aiter(async_iterable) -> async_iterator
  • anext(async_iterator, default=None) -> awaitable

魔术方法与内置函数的关系:

魔术方法内置函数说明
__aiter__()aiter()返回异步迭代器对象
__anext__()anext()返回下一个异步迭代值
__iter__()iter()同步版本:返回迭代器
__next__()next()同步版本:返回下一个值
import asyncio

class AsyncCounter:
"""异步计数器"""
def __init__(self, max_count):
self.count = 0
self.max_count = max_count

def __aiter__(self):
# aiter(obj) 会调用 obj.__aiter__()
return self

async def __anext__(self):
# anext(obj) 会调用 obj.__anext__()
if self.count >= self.max_count:
raise StopAsyncIteration
await asyncio.sleep(0.5)
self.count += 1
return self.count

async def main():
counter = AsyncCounter(3)

# 使用 aiter 和 anext
async_iter = aiter(counter)

# 获取第一个值
value1 = await anext(async_iter)
print(f"值1: {value1}") # 值1: 1

# 获取第二个值
value2 = await anext(async_iter)
print(f"值2: {value2}") # 值2: 2

# 获取第三个值
value3 = await anext(async_iter)
print(f"值3: {value3}") # 值3: 3

# 使用默认值避免 StopAsyncIteration
value4 = await anext(async_iter, "没有更多数据")
print(f"值4: {value4}") # 值4: 没有更多数据

asyncio.run(main())
tip

理解魔术方法与内置函数的关系:

内置函数是对魔术方法的封装,提供了更友好的调用方式。

import asyncio

class MyAsyncIterable:
def __aiter__(self):
return self

async def __anext__(self):
# 返回下一个值...
pass

obj = MyAsyncIterable()

# 这两种方式是等价的:
async_iter1 = aiter(obj) # 推荐:使用内置函数
async_iter2 = obj.__aiter__() # 不推荐:直接调用魔术方法

# 同样,这两种方式也是等价的:
value1 = await anext(async_iter1) # 推荐
value2 = await async_iter2.__anext__() # 不推荐

为什么要用内置函数?

  1. 可读性更好aiter(obj)obj.__aiter__() 更清晰
  2. 遵循约定:Python 风格指南推荐使用内置函数
  3. 类型检查:内置函数会进行必要的检查和处理
  4. 向后兼容:内置函数可能包含额外的逻辑和优化

类比同步版本:

# 同步迭代器
my_list = [1, 2, 3]

# 推荐写法
it = iter(my_list) # 而不是 my_list.__iter__()
value = next(it) # 而不是 it.__next__()

# 异步迭代器
async def demo():
async_obj = MyAsyncIterable()

# 推荐写法
async_it = aiter(async_obj) # 而不是 async_obj.__aiter__()
value = await anext(async_it) # 而不是 await async_it.__anext__()
# 实际应用:异步数据流处理
import asyncio

async def async_data_stream():
"""模拟异步数据流"""
for i in range(5):
await asyncio.sleep(0.5)
yield f"数据-{i}"

async def process_stream():
# 使用 async for 处理数据流
async for data in async_data_stream():
print(f"处理: {data}")

# 等价于使用 aiter 和 anext
print("\n使用 aiter/anext:")
stream = aiter(async_data_stream())
while True:
try:
data = await anext(stream)
print(f"处理: {data}")
except StopAsyncIteration:
break

asyncio.run(process_stream())

asyncio

asyncio 常用函数

虽然 asyncio 是标准库,但这里列出一些常用函数:

import asyncio

async def demo_asyncio_functions():
# 1. asyncio.sleep() - 异步睡眠
await asyncio.sleep(1)

# 2. asyncio.wait_for() - 设置超时
try:
result = await asyncio.wait_for(
asyncio.sleep(5),
timeout=2.0
)
except asyncio.TimeoutError:
print("操作超时")

# 3. asyncio.as_completed() - 按完成顺序获取结果
tasks = [
asyncio.create_task(asyncio.sleep(2, result="慢任务")),
asyncio.create_task(asyncio.sleep(1, result="快任务"))
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"完成: {result}")
# 输出顺序: 快任务, 慢任务

asyncio.run(demo_asyncio_functions())

asyncio.gather()

使用 asyncio.gather()asyncio.create_task() 可以并发执行多个协程。

函数签名:

  • asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False) -> List[Any]

参数说明:

  • *coros_or_futures:协程或 future 对象
  • loop=None:事件循环,默认使用当前事件循环
  • return_exceptions=False:是否返回异常,默认不返回

返回值说明:

  • 返回结果为列表,列表中包含所有协程或 future 对象的返回结果
  • 如果return_exceptions=True,则返回结果为列表,列表中包含所有协程或 future 对象的返回结果,包括异常
import asyncio
import time

async def task(name, duration):
"""模拟一个异步任务"""
print(f"任务 {name} 开始")
await asyncio.sleep(duration)
print(f"任务 {name} 完成")
return f"结果-{name}"

async def main():
# 方法1: 使用 gather 并发执行
start = time.time()
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
print(f"结果: {results}")
print(f"总耗时: {time.time() - start:.2f}秒")
# 总耗时约3秒(最长任务的时间),而非6秒

asyncio.run(main())
# 输出:
# 任务 A 开始
# 任务 B 开始
# 任务 C 开始
# 任务 B 完成
# 任务 A 完成
# 任务 C 完成
# 结果: ['结果-A', '结果-B', '结果-C']
# 总耗时: 3.00秒

asyncio.create_task()

使用 asyncio.gather()asyncio.create_task() 可以并发执行多个协程。

函数签名:

  • asyncio.create_task(coro, *, name=None)

参数说明:

  • coro:协程对象
  • name=None:任务名称,默认不设置

返回值说明:

  • 返回结果为任务对象
import asyncio
import time

async def task(name, duration):
"""模拟一个异步任务"""
print(f"任务 {name} 开始")
await asyncio.sleep(duration)
print(f"任务 {name} 完成")
return f"结果-{name}"

async def main2():
start = time.time()

# 创建任务,立即开始执行
task1 = asyncio.create_task(task("X", 2))
task2 = asyncio.create_task(task("Y", 1))
task3 = asyncio.create_task(task("Z", 3))

# 等待所有任务完成
result1 = await task1
result2 = await task2
result3 = await task3

print(f"结果: {result1}, {result2}, {result3}")
print(f"总耗时: {time.time() - start:.2f}秒")

asyncio.run(main2())