Skip to main content

异步编程

info

异步编程是 Python 3.5+ 引入的重要特性,通过 async/await 语法提供了一种优雅的方式来处理并发操作。它特别适合 I/O 密集型任务,如网络请求、数据库查询等,可以显著提高程序性能。

# 传统同步方式 - 总耗时 = 任务1 + 任务2 + 任务3
# 异步方式 - 总耗时 ≈ max(任务1, 任务2, 任务3)

async def main():
# 三个网络请求可以并发执行
results = await asyncio.gather(
fetch_data('url1'),
fetch_data('url2'),
fetch_data('url3')
)

PEP 492 – 协程与 async/await 语法

异步编程

异步编程是一种编程范式,它允许程序在等待某些操作(如 I/O)完成时继续执行其他任务,而不是阻塞等待。

异步编程的优势:

  • 高效并发:同时处理多个 I/O 操作,不需要多线程
  • 资源节省:避免了线程切换的开销
  • 代码优雅:使用 async/await 语法,避免回调地狱
  • 性能提升:在 I/O 密集型任务中性能显著提升

适用场景:

  • 网络请求(API 调用、爬虫)
  • 数据库查询
  • 文件 I/O 操作
  • WebSocket 通信
warning

异步编程不适合 CPU 密集型任务。对于计算密集型操作,应该使用多进程(multiprocessing)。

async 和 await 关键字

async 关键字用于定义协程函数,await 关键字用于等待协程执行完成。

async 协程函数不能直接调用,最终都需要通过asyncio标准库调用。

import asyncio

async def hello():
await asyncio.sleep(1)
return "Hello, World!"

# 在另一个 async 函数内部使用 await 调用协程函数
async def main():
"""但最终还是需要回到asyncio.run()中执行"""
result = await hello()
print(result)
# 方式1: asyncio.run() - Python 3.7+ 推荐方式
asyncio.run(main())

# 方式2: asyncio.create_task() - 在运行的事件循环中创建任务
async def create_tasks():
task = asyncio.create_task(hello()) # 创建任务
# 可以在等待之前做其他事情
print("任务已创建,继续做其他事情...")
result = await task # 执行任务,并等待任务完成
print(result)

asyncio.run(create_tasks())

# 方式3: asyncio.gather() - 并发执行多个协程
async def run_multiple():
results = await asyncio.gather(hello(), hello(), hello())
print(results) # ['Hello, World!', 'Hello, World!', 'Hello, World!']

asyncio.run(run_multiple())

# 方式4: 手动管理事件循环
loop = asyncio.get_event_loop()
try:
result = loop.run_until_complete(hello())
print(result)
finally:
loop.close()

async for

异步迭代器使用 async for 语法,适用于需要异步获取数据的场景。

import asyncio

class AsyncRange:
"""异步范围迭代器"""
def __init__(self, start, end):
self.current = start
self.end = end

def __aiter__(self):
return self

async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.5) # 模拟异步操作
self.current += 1
return self.current - 1

async def main():
# 使用 async for 迭代
async for num in AsyncRange(0, 5):
print(f"获取数字: {num}")

asyncio.run(main())
# 输出:
# 获取数字: 0
# 获取数字: 1
# 获取数字: 2
# 获取数字: 3
# 获取数字: 4

async with

异步上下文管理器使用 async with 语法,用于管理异步资源。

import asyncio

class AsyncResource:
"""异步资源管理器"""
async def __aenter__(self):
print("获取资源...")
await asyncio.sleep(1)
print("资源已获取")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源...")
await asyncio.sleep(1)
print("资源已释放")

async def main():
async with AsyncResource() as resource:
print("使用资源")
await asyncio.sleep(0.5)

asyncio.run(main())
# 输出:
# 获取资源...
# 资源已获取
# 使用资源
# 释放资源...
# 资源已释放

内置函数

aiter函数、anext函数

aiter()anext() 是异步版本的 iter()next(),用于处理异步迭代器。

函数签名:

  • aiter(async_iterable) -> async_iterator
  • anext(async_iterator, default=None) -> awaitable

魔术方法与内置函数的关系:

魔术方法内置函数说明
__aiter__()aiter()返回异步迭代器对象
__anext__()anext()返回下一个异步迭代值
__iter__()iter()同步版本:返回迭代器
__next__()next()同步版本:返回下一个值
import asyncio

class AsyncCounter:
"""异步计数器"""
def __init__(self, max_count):
self.count = 0
self.max_count = max_count

def __aiter__(self):
# aiter(obj) 会调用 obj.__aiter__()
return self

async def __anext__(self):
# anext(obj) 会调用 obj.__anext__()
if self.count >= self.max_count:
raise StopAsyncIteration
await asyncio.sleep(0.5)
self.count += 1
return self.count

async def main():
counter = AsyncCounter(3)

# 使用 aiter 和 anext
async_iter = aiter(counter)

# 获取第一个值
value1 = await anext(async_iter)
print(f"值1: {value1}") # 值1: 1

# 获取第二个值
value2 = await anext(async_iter)
print(f"值2: {value2}") # 值2: 2

# 获取第三个值
value3 = await anext(async_iter)
print(f"值3: {value3}") # 值3: 3

# 使用默认值避免 StopAsyncIteration
value4 = await anext(async_iter, "没有更多数据")
print(f"值4: {value4}") # 值4: 没有更多数据

asyncio.run(main())
tip

理解魔术方法与内置函数的关系:

内置函数是对魔术方法的封装,提供了更友好的调用方式。

import asyncio

class MyAsyncIterable:
def __aiter__(self):
return self

async def __anext__(self):
# 返回下一个值...
pass

obj = MyAsyncIterable()

# 这两种方式是等价的:
async_iter1 = aiter(obj) # 推荐:使用内置函数
async_iter2 = obj.__aiter__() # 不推荐:直接调用魔术方法

# 同样,这两种方式也是等价的:
value1 = await anext(async_iter1) # 推荐
value2 = await async_iter2.__anext__() # 不推荐

为什么要用内置函数?

  1. 可读性更好aiter(obj)obj.__aiter__() 更清晰
  2. 遵循约定:Python 风格指南推荐使用内置函数
  3. 类型检查:内置函数会进行必要的检查和处理
  4. 向后兼容:内置函数可能包含额外的逻辑和优化

类比同步版本:

# 同步迭代器
my_list = [1, 2, 3]

# 推荐写法
it = iter(my_list) # 而不是 my_list.__iter__()
value = next(it) # 而不是 it.__next__()

# 异步迭代器
async def demo():
async_obj = MyAsyncIterable()

# 推荐写法
async_it = aiter(async_obj) # 而不是 async_obj.__aiter__()
value = await anext(async_it) # 而不是 await async_it.__anext__()
# 实际应用:异步数据流处理
import asyncio

async def async_data_stream():
"""模拟异步数据流"""
for i in range(5):
await asyncio.sleep(0.5)
yield f"数据-{i}"

async def process_stream():
# 使用 async for 处理数据流
async for data in async_data_stream():
print(f"处理: {data}")

# 等价于使用 aiter 和 anext
print("\n使用 aiter/anext:")
stream = aiter(async_data_stream())
while True:
try:
data = await anext(stream)
print(f"处理: {data}")
except StopAsyncIteration:
break

asyncio.run(process_stream())

标准库推荐

  • webbrowser: 使用系统默认浏览器打开 URL。
  • urllib: URL处理、打开、解析等。
  • asyncio: 核心异步 IO 框架,提供事件循环、协程调度等能力。
  • socket: 低层网络编程接口,也是许多高层网络库的基础。
  • threading: 基于线程的并发模型,适合与异步 IO 结合处理少量 CPU 密集任务。
  • multiprocessing: 使用多进程绕过 GIL,适合 CPU 密集型任务的并行计算。
  • concurrent.futures: 提供统一的线程池/进程池接口(ThreadPoolExecutor / ProcessPoolExecutor)。
  • queue: 线程安全的队列,用于在线程/协程间传递任务与数据。
  • subprocess: 启动子进程并与之通信,常用于异步地调用外部命令。