异步编程
info
异步编程是 Python 3.5+ 引入的重要特性,通过 async
/await
语法提供了一种优雅的方式来处理并发操作。它特别适合 I/O 密集型任务,如网络请求、数据库查询等,可以显著提高程序性能。
# 传统同步方式 - 总耗时 = 任务1 + 任务2 + 任务3
# 异步方式 - 总耗时 ≈ max(任务1, 任务2, 任务3)
async def main():
# 三个网络请求可以并发执行
results = await asyncio.gather(
fetch_data('url1'),
fetch_data('url2'),
fetch_data('url3')
)
异步编程
异步编程是一种编程范式,它允许程序在等待某些操作(如 I/O)完成时继续执行其他任务,而不是阻塞等待。
异步编程的优势:
- 高效并发:同时处理多个 I/O 操作,不需要多线程
- 资源节省:避免了线程切换的开销
- 代码优雅:使用 async/await 语法,避免回调地狱
- 性能提升:在 I/O 密集型任务中性能显著提升
适用场景:
- 网络请求(API 调用、爬虫)
- 数据库查询
- 文件 I/O 操作
- WebSocket 通信
warning
异步编程不适合 CPU 密 集型任务。对于计算密集型操作,应该使用多进程(multiprocessing
)。
async 和 await 关键字
async
关键字用于定义协程函数,await
关键字用于等待协程执行完成。
async 协程函数不能直接调用,最终都需要通过asyncio
标准库调用。
import asyncio
async def hello():
await asyncio.sleep(1)
return "Hello, World!"
# 在另一个 async 函数内部使用 await 调用协程函数
async def main():
"""但最终还是需要回到asyncio.run()中执行"""
result = await hello()
print(result)
# 方式1: asyncio.run() - Python 3.7+ 推荐方式
asyncio.run(main())
# 方式2: asyncio.create_task() - 在运行的事件循环中创建任务
async def create_tasks():
task = asyncio.create_task(hello()) # 创建任务
# 可以在等待之前做其他事情
print("任务已创建,继续做其他事情...")
result = await task # 执行任务,并等待任务完成
print(result)
asyncio.run(create_tasks())
# 方式3: asyncio.gather() - 并发执行多个协程
async def run_multiple():
results = await asyncio.gather(hello(), hello(), hello())
print(results) # ['Hello, World!', 'Hello, World!', 'Hello, World!']
asyncio.run(run_multiple())
# 方式4: 手动管理事件循环
loop = asyncio.get_event_loop()
try:
result = loop.run_until_complete(hello())
print(result)
finally:
loop.close()
async for
异步迭代器使用 async for
语法,适用于需要异步获取数据的场景。
import asyncio
class AsyncRange:
"""异步范围迭代器"""
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.5) # 模拟异步操作
self.current += 1
return self.current - 1
async def main():
# 使用 async for 迭代
async for num in AsyncRange(0, 5):
print(f"获取数字: {num}")
asyncio.run(main())
# 输出:
# 获取数字: 0
# 获取数字: 1
# 获取数字: 2
# 获取数字: 3
# 获取数字: 4
async with
异步上下文管理器使用 async with
语法,用 于管理异步资源。
import asyncio
class AsyncResource:
"""异步资源管理器"""
async def __aenter__(self):
print("获取资源...")
await asyncio.sleep(1)
print("资源已获取")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源...")
await asyncio.sleep(1)
print("资源已释放")
async def main():
async with AsyncResource() as resource:
print("使用资源")
await asyncio.sleep(0.5)
asyncio.run(main())
# 输出:
# 获取资源...
# 资源已获取
# 使用资源
# 释放资源...
# 资源已释放
内置函数
aiter 函数和 anext 函数
aiter()
和 anext()
是异步版本的 iter()
和 next()
,用于处理异步迭代器。
函数签名:
aiter(async_iterable) -> async_iterator
anext(async_iterator, default=None) -> awaitable
魔术方法与内置函数的关系:
魔术方法 | 内置函数 | 说明 |
---|---|---|
__aiter__() | aiter() | 返回异步迭代器对象 |
__anext__() | anext() | 返回下一个异步迭代值 |
__iter__() | iter() | 同步版本:返回迭代器 |
__next__() | next() | 同步版本:返回下一个值 |
import asyncio
class AsyncCounter:
"""异步计数器"""
def __init__(self, max_count):
self.count = 0
self.max_count = max_count
def __aiter__(self):
# aiter(obj) 会调用 obj.__aiter__()
return self
async def __anext__(self):
# anext(obj) 会调用 obj.__anext__()
if self.count >= self.max_count:
raise StopAsyncIteration
await asyncio.sleep(0.5)
self.count += 1
return self.count
async def main():
counter = AsyncCounter(3)
# 使用 aiter 和 anext
async_iter = aiter(counter)
# 获取第一个值
value1 = await anext(async_iter)
print(f"值1: {value1}") # 值1: 1
# 获取第二个值
value2 = await anext(async_iter)
print(f"值2: {value2}") # 值2: 2
# 获取第三个值
value3 = await anext(async_iter)
print(f"值3: {value3}") # 值3: 3
# 使用默认值避免 StopAsyncIteration
value4 = await anext(async_iter, "没有更多数据")
print(f"值4: {value4}") # 值4: 没有更多数据
asyncio.run(main())
tip
理解魔术方法与内置函数的关系:
内置函数是对魔术方法的封装,提供了更友好的调用方式。
import asyncio
class MyAsyncIterable:
def __aiter__(self):
return self
async def __anext__(self):
# 返回下一个值...
pass
obj = MyAsyncIterable()
# 这两种方式是等价的:
async_iter1 = aiter(obj) # 推荐:使用内置函数
async_iter2 = obj.__aiter__() # 不推荐:直接调用魔术方法
# 同样,这两种方式也是等价的:
value1 = await anext(async_iter1) # 推荐
value2 = await async_iter2.__anext__() # 不推荐
为什么要用内置函数?
- 可读性更好:
aiter(obj)
比obj.__aiter__()
更清晰 - 遵循约定:Python 风格指南推荐使用内置函数
- 类型检查:内置函数会进行必要的检查和处理
- 向后兼容:内置函数可能包含额外的逻辑和优化
类比同步版本:
# 同步迭代器
my_list = [1, 2, 3]
# 推荐写 法
it = iter(my_list) # 而不是 my_list.__iter__()
value = next(it) # 而不是 it.__next__()
# 异步迭代器
async def demo():
async_obj = MyAsyncIterable()
# 推荐写法
async_it = aiter(async_obj) # 而不是 async_obj.__aiter__()
value = await anext(async_it) # 而不是 await async_it.__anext__()
# 实际应用:异步数据流处理
import asyncio
async def async_data_stream():
"""模拟异步数据流"""
for i in range(5):
await asyncio.sleep(0.5)
yield f"数据-{i}"
async def process_stream():
# 使用 async for 处理数据流
async for data in async_data_stream():
print(f"处理: {data}")
# 等价于使用 aiter 和 anext
print("\n使用 aiter/anext:")
stream = aiter(async_data_stream())
while True:
try:
data = await anext(stream)
print(f"处理: {data}")
except StopAsyncIteration:
break
asyncio.run(process_stream())
asyncio
asyncio 常用函数
虽然 asyncio 是标准库,但这里列出一些常用函数:
import asyncio
async def demo_asyncio_functions():
# 1. asyncio.sleep() - 异步睡眠
await asyncio.sleep(1)
# 2. asyncio.wait_for() - 设置超时
try:
result = await asyncio.wait_for(
asyncio.sleep(5),
timeout=2.0
)
except asyncio.TimeoutError:
print("操作超时")
# 3. asyncio.as_completed() - 按完成顺序获取结果
tasks = [
asyncio.create_task(asyncio.sleep(2, result="慢任务")),
asyncio.create_task(asyncio.sleep(1, result="快任务"))
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"完成: {result}")
# 输出顺序: 快任务, 慢任务
asyncio.run(demo_asyncio_functions())
asyncio.gather()
使用 asyncio.gather()
或 asyncio.create_task()
可以并发执行多个协程。
函数签名:
asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False) -> List[Any]
参数说明:
*coros_or_futures
:协程或 future 对象loop=None
:事件循环,默认使用当前事件循环return_exceptions=False
:是否返回异常,默认不返回
返回值说明:
- 返回结果为列表,列表中包含所有协程或 future 对象的返回结果
- 如果
return_exceptions=True
,则返回结果为列表,列表中包含所有协程或 future 对象的返回结果,包括异常
import asyncio
import time
async def task(name, duration):
"""模拟一个异步任务"""
print(f"任务 {name} 开始")
await asyncio.sleep(duration)
print(f"任务 {name} 完成")
return f"结果-{name}"
async def main():
# 方法1: 使用 gather 并发执行
start = time.time()
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
print(f"结果: {results}")
print(f"总耗时: {time.time() - start:.2f}秒")
# 总耗时约3秒(最长任务的时间),而非6秒
asyncio.run(main())
# 输出:
# 任务 A 开始
# 任务 B 开始
# 任务 C 开始
# 任务 B 完成
# 任务 A 完成
# 任务 C 完成
# 结果: ['结果-A', '结果-B', '结果-C']
# 总耗时: 3.00秒
asyncio.create_task()
使用 asyncio.gather()
或 asyncio.create_task()
可以并发执行多个协程。
函数签名:
asyncio.create_task(coro, *, name=None)
参数说明:
coro
:协程对象name=None
:任务名称,默认不设置
返回值说明:
- 返回结果为任务对象
import asyncio
import time
async def task(name, duration):
"""模拟一个异步任务"""
print(f"任务 {name} 开始")
await asyncio.sleep(duration)
print(f"任务 {name} 完成")
return f"结果-{name}"
async def main2():
start = time.time()
# 创建任务,立即开始执行
task1 = asyncio.create_task(task("X", 2))
task2 = asyncio.create_task(task("Y", 1))
task3 = asyncio.create_task(task("Z", 3))
# 等待所有任务完成
result1 = await task1
result2 = await task2
result3 = await task3
print(f"结果: {result1}, {result2}, {result3}")
print(f"总耗时: {time.time() - start:.2f}秒")
asyncio.run(main2())