Skip to main content

asyncio

asyncio 模块

协程是编写并发代码的库,是构建 IO 密集型和高级结构化网络代码的最佳选择。

例程的运行方式是通过代码主动切换状态并等待处理,因此效率更高,语法也更详细。循环对象需要处于活动状态:创建、设置、提交、等待运行和停止。

例行程序的最佳数量取决于内存使用情况。

asyncio 模块包含了一些工具,用于编写异步代码。

协程的工作原理是事件循环,事件循环是一个无限循环,它等待事件并执行它们。

每次任务会被挂起至事件循环队列中,然后按顺序执行。

await 关键字用于挂起协程,直到它被调用。

async 关键字用于定义协程。

asyncio 模块用于实现异步编程。

asyncio:asyncio Multiprocessing Module Code Documentation

一些常用函数:

import asyncio

async def demo_asyncio_functions():
# 1. asyncio.sleep() - 异步睡眠
await asyncio.sleep(1)

# 2. asyncio.wait_for() - 设置超时
try:
result = await asyncio.wait_for(
asyncio.sleep(5),
timeout=2.0
)
except asyncio.TimeoutError:
print("操作超时")

# 3. asyncio.as_completed() - 按完成顺序获取结果
tasks = [
asyncio.create_task(asyncio.sleep(2, result="慢任务")),
asyncio.create_task(asyncio.sleep(1, result="快任务"))
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"完成: {result}")
# 输出顺序: 快任务, 慢任务

asyncio.run(demo_asyncio_functions())

生成器函数与协程(注:函数)非常相似,它们 yield 多次,它们具有多个入口点,并且它们的执行可以被挂起。唯一的区别是生成器函数不能控制在它在 yield 后交给哪里继续执行,控制权总是转移到生成器的调用者

在 Python 创建协程时,task 是 future 的子类,所以 task 继承了 future 的属性和方法。几乎没有不同。

import asyncio

class TestA:
def __init__(self,loop) -> None:
self.loop = loop
asyncio.set_event_loop(loop=self.loop) # step 3.1

async def run_page(self,tid): # step 7
print(tid)
# 此处编写爬虫代码
return tid

async def close(self,):
for i in asyncio.all_tasks(): # step 9.1
i.cancel()
self.loop.stop() # step 9.2


def test():
get_async_loop = asyncio.new_event_loop() # step 1
asyncio.set_event_loop(get_async_loop) # step 2

async def spider(task_obj):
async_task = [asyncio.ensure_future(task_obj.run_page(1)),
asyncio.ensure_future(task_obj.run_page(2)),] # step 6
await asyncio.wait(async_task) # step 8

await task_obj.close() # step 9

task_obj = TestA(get_async_loop) #step 3
asyncio.run_coroutine_threadsafe(spider(task_obj), loop=get_async_loop) #step 4
get_async_loop.run_forever() # step 5

test()

asyncio.gather()

使用 asyncio.gather()asyncio.create_task() 可以并发执行多个协程。

函数签名:

  • asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False) -> List[Any]

参数说明:

  • *coros_or_futures:协程或 future 对象
  • loop=None:事件循环,默认使用当前事件循环
  • return_exceptions=False:是否返回异常,默认不返回

返回值说明:

  • 返回结果为列表,列表中包含所有协程或 future 对象的返回结果
  • 如果return_exceptions=True,则返回结果为列表,列表中包含所有协程或 future 对象的返回结果,包括异常
import asyncio
import time

async def task(name, duration):
"""模拟一个异步任务"""
print(f"任务 {name} 开始")
await asyncio.sleep(duration)
print(f"任务 {name} 完成")
return f"结果-{name}"

async def main():
# 方法1: 使用 gather 并发执行
start = time.time()
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
print(f"结果: {results}")
print(f"总耗时: {time.time() - start:.2f}秒")
# 总耗时约3秒(最长任务的时间),而非6秒

asyncio.run(main())
# 输出:
# 任务 A 开始
# 任务 B 开始
# 任务 C 开始
# 任务 B 完成
# 任务 A 完成
# 任务 C 完成
# 结果: ['结果-A', '结果-B', '结果-C']
# 总耗时: 3.00秒

asyncio.create_task()

使用 asyncio.gather()asyncio.create_task() 可以并发执行多个协程。

函数签名:

  • asyncio.create_task(coro, *, name=None)

参数说明:

  • coro:协程对象
  • name=None:任务名称,默认不设置

返回值说明:

  • 返回结果为任务对象
import asyncio
import time

async def task(name, duration):
"""模拟一个异步任务"""
print(f"任务 {name} 开始")
await asyncio.sleep(duration)
print(f"任务 {name} 完成")
return f"结果-{name}"

async def main2():
start = time.time()

# 创建任务,立即开始执行
task1 = asyncio.create_task(task("X", 2))
task2 = asyncio.create_task(task("Y", 1))
task3 = asyncio.create_task(task("Z", 3))

# 等待所有任务完成
result1 = await task1
result2 = await task2
result3 = await task3

print(f"结果: {result1}, {result2}, {result3}")
print(f"总耗时: {time.time() - start:.2f}秒")

asyncio.run(main2())