异步编程
info
异步编程是 Python 3.5+ 引入的重要特性,通过 async/await 语法提供了一种优雅的方式来处理并发操作。它特别适合 I/O 密集型任务,如网络请求、数据库查询等,可以显 著提高程序性能。
# 传统同步方式 - 总耗时 = 任务1 + 任务2 + 任务3
# 异步方式 - 总耗时 ≈ max(任务1, 任务2, 任务3)
async def main():
# 三个网络请求可以并发执行
results = await asyncio.gather(
fetch_data('url1'),
fetch_data('url2'),
fetch_data('url3')
)
异步编程
异步编程是一种编程范式,它允许程序在等待某些操作(如 I/O)完成时继续执行其他任务,而不是阻塞等待。
异步编程的优势:
- 高效并发:同时处理多个 I/O 操作,不需要多线程
- 资源节省:避免了线程切换的开销
- 代码优雅:使用 async/await 语法,避免回调地狱
- 性能提升:在 I/O 密集型任务中性能显著提升
适用场景:
- 网络请求(API 调用、爬虫)
- 数据库查询
- 文件 I/O 操作
- WebSocket 通信
warning
异步编程不适合 CPU 密集型任务。对于计算密集型操作,应该使用多进程(multiprocessing)。
async 和 await 关键字
async 关键字用于定义协程函数,await 关键字用于等待协程执行完成。
async 协程函数不能直接调用,最终都需要通过asyncio标准库调用。
import asyncio
async def hello():
await asyncio.sleep(1)
return "Hello, World!"
# 在另一个 async 函数内部使用 await 调用协程函数
async def main():
"""但最终还是需要回到asyncio.run()中执行"""
result = await hello()
print(result)
# 方式1: asyncio.run() - Python 3.7+ 推荐方式
asyncio.run(main())
# 方式2: asyncio.create_task() - 在运行的事件循环中创建任务
async def create_tasks():
task = asyncio.create_task(hello()) # 创建任务
# 可以在等待之前做其他事情
print("任务已创建,继续做其他事情...")
result = await task # 执行任务,并等待任务完成
print(result)
asyncio.run(create_tasks())
# 方式3: asyncio.gather() - 并发执行多个协程
async def run_multiple():
results = await asyncio.gather(hello(), hello(), hello())
print(results) # ['Hello, World!', 'Hello, World!', 'Hello, World!']
asyncio.run(run_multiple())
# 方式4: 手动管理事件循环
loop = asyncio.get_event_loop()
try:
result = loop.run_until_complete(hello())
print(result)
finally:
loop.close()
async for
异步迭代器使用 async for 语法,适用于需要异步获取数据的场景。
import asyncio
class AsyncRange:
"""异步范围迭代器"""
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.5) # 模拟异步操作
self.current += 1
return self.current - 1
async def main():
# 使用 async for 迭代
async for num in AsyncRange(0, 5):
print(f"获取数字: {num}")
asyncio.run(main())
# 输出:
# 获取数字: 0
# 获取数字: 1
# 获取数字: 2
# 获取数字: 3
# 获取数字: 4
async with
异步上下文管理器使用 async with 语法,用于管理异步资源。
import asyncio
class AsyncResource:
"""异步资源管理器"""
async def __aenter__(self):
print("获取资源...")
await asyncio.sleep(1)
print("资源已获取")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源...")
await asyncio.sleep(1)
print("资源已释放")
async def main():
async with AsyncResource() as resource:
print("使用资源")
await asyncio.sleep(0.5)
asyncio.run(main())
# 输出:
# 获取资源...
# 资源已获取
# 使用资源
# 释放资源...
# 资源已释放
内置函数
aiter函数、anext函数
aiter() 和 anext() 是异步版本的 iter() 和 next(),用于处理异步迭代器。
函数签名:
aiter(async_iterable) -> async_iteratoranext(async_iterator, default=None) -> awaitable
魔术方法与内置函数的关系:
| 魔术方法 | 内置函数 | 说明 |
|---|---|---|
__aiter__() | aiter() | 返回异步迭代器对象 |
__anext__() | anext() | 返回下一个异步迭代值 |
__iter__() | iter() | 同步版本:返回迭代器 |
__next__() | next() | 同步版本:返回下一个值 |
import asyncio
class AsyncCounter:
"""异步计数器"""
def __init__(self, max_count):
self.count = 0
self.max_count = max_count
def __aiter__(self):
# aiter(obj) 会调用 obj.__aiter__()
return self
async def __anext__(self):
# anext(obj) 会调用 obj.__anext__()
if self.count >= self.max_count:
raise StopAsyncIteration
await asyncio.sleep(0.5)
self.count += 1
return self.count
async def main():
counter = AsyncCounter(3)
# 使用 aiter 和 anext
async_iter = aiter(counter)
# 获取第一个值
value1 = await anext(async_iter)
print(f"值1: {value1}") # 值1: 1
# 获取第二个值
value2 = await anext(async_iter)
print(f"值2: {value2}") # 值2: 2
# 获取第三个值
value3 = await anext(async_iter)
print(f"值3: {value3}") # 值3: 3
# 使用默认值避免 StopAsyncIteration
value4 = await anext(async_iter, "没有更多数据")
print(f"值4: {value4}") # 值4: 没有更多数据
asyncio.run(main())
tip
理解魔术方法与内置函数的关系:
内置函数是对魔术方法的封装,提供了更友好的调用方式。
import asyncio
class MyAsyncIterable:
def __aiter__(self):
return self
async def __anext__(self):
# 返回下一个值...
pass
obj = MyAsyncIterable()
# 这两种方式是等价的:
async_iter1 = aiter(obj) # 推荐:使用内置函数
async_iter2 = obj.__aiter__() # 不推荐:直接调用魔术方法
# 同样,这两种方式也是等价的:
value1 = await anext(async_iter1) # 推荐
value2 = await async_iter2.__anext__() # 不推荐
为什么要用内置函数?
- 可读性更好:
aiter(obj)比obj.__aiter__()更清晰 - 遵循约定:Python 风格指南推荐使用内置函数
- 类型检查:内置函数会进行必要的检查和处理
- 向后兼容:内置函数可能包含额外的逻辑和优化
类比同步版本:
# 同步迭代器
my_list = [1, 2, 3]
# 推荐写法
it = iter(my_list) # 而不是 my_list.__iter__()
value = next(it) # 而不是 it.__next__()
# 异步迭代器
async def demo():
async_obj = MyAsyncIterable()
# 推荐写法
async_it = aiter(async_obj) # 而不是 async_obj.__aiter__()
value = await anext(async_it) # 而不是 await async_it.__anext__()
# 实际应用:异步数据流处理
import asyncio
async def async_data_stream():
"""模拟异步数据流"""
for i in range(5):
await asyncio.sleep(0.5)
yield f"数据-{i}"
async def process_stream():
# 使用 async for 处理数据流
async for data in async_data_stream():
print(f"处理: {data}")
# 等价于使用 aiter 和 anext
print("\n使用 aiter/anext:")
stream = aiter(async_data_stream())
while True:
try:
data = await anext(stream)
print(f"处理: {data}")
except StopAsyncIteration:
break
asyncio.run(process_stream())
标准库推荐
- webbrowser: 使用系统默认浏览器打开 URL。
- urllib: URL处理、打开、解析等。
- asyncio: 核心异步 IO 框架,提供事件循环、协程调度等能力。
- socket: 低层网络编程接口,也是许多高层网络库的基础。
- threading: 基于线程的并发模型,适合与异步 IO 结合处理少量 CPU 密集任务。
- multiprocessing: 使用多进程绕过 GIL,适合 CPU 密集型任务的并行计算。
- concurrent.futures: 提供统一的线程池/进程池接口(
ThreadPoolExecutor/ProcessPoolExecutor)。 - queue: 线程安全的队列,用于在线程/协程间传递任务与数据。
- subprocess: 启动子进程并与之通信,常用于异步地调用外部命令。