Python 协程

介绍

Python 中的协程是通过 asyncawait 关键字实现的,它们使你能够编写并发代码,就像编写普通的同步代码一样。以下是一个简单的示例,说明如何使用协程来异步执行任务。

首先,确保你有 Python 3.5 或更高的版本,因为 asyncawait 是从 Python 3.5 开始引入的。

基本用法

单个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import time

# 这是一个异步函数,通过 async 关键字定义
async def fetch_data(delay):
print(f"开始获取数据(耗时:{delay}秒)...")
# 模拟 IO 任务,比如网络请求,使用 asyncio.sleep 替代 time.sleep
await asyncio.sleep(delay)
print("数据获取完成!")
return {'data': 'some data'} # 模拟返回的数据

# 这是另一个异步函数,用于处理数据
async def process_data(data):
print("开始处理数据...")
await asyncio.sleep(2) # 模拟数据处理所需的时间
print(f"数据处理完成: {data}")

# 主函数,用于运行协程
async def main():
# 启动 fetch_data 协程,等待它完成
data = await fetch_data(3)
# 接着处理数据
await process_data(data)

# 运行主函数协程
asyncio.run(main())

在这个示例中,fetch_data 函数模拟了一个需要 3 秒钟完成的数据获取任务,它使用 asyncio.sleep 来异步等待,而不会阻塞整个事件循环。process_data 函数模拟对获取的数据进行处理,耗时 2 秒。

最后,在 main 函数中,我们使用 await 来调用这些异步函数,这样可以一步一步地执行它们,asyncio.run(main()) 用于启动事件循环并运行 main 函数。

输出结果如下,展示了程序的执行顺序:

1
2
3
4
开始获取数据(耗时:3秒)...
数据获取完成!
开始处理数据...
数据处理完成: {'data': 'some data'}

此例子展示了协程的基本用法,但实际上协程的威力在于能够使多个 IO 密集型任务并发运行,大大提高程序效率。例如,你可以创建多个fetch_data任务并同时运行它们,然后等待所有结果准备就绪,从而充分利用等待时间。

多个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio

# 定义异步函数
async def fetch_data(delay, data_id):
print(f"开始获取数据 {data_id}(耗时:{delay}秒)...")
await asyncio.sleep(delay) # 模拟等待 IO 操作
print(f"数据 {data_id} 获取完成!")
return {data_id: 'some data'} # 想象成从网上获得了一些数据

# 主函数,用于运行协程
async def main():
# 创建三个 fetch_data 协程任务
task1 = fetch_data(3, 'Data1')
task2 = fetch_data(2, 'Data2')
task3 = fetch_data(1, 'Data3')

# 使用 asyncio.gather 并行运行所有任务
results = await asyncio.gather(task1, task2, task3)

# results 为所有任务的返回值列表
print(f"所有数据已获取:{results}")

# 运行主函数协程
asyncio.run(main())

在这段代码中,fetch_data 函数被修改,增加了一个 data_id 参数,以便区分不同的任务。main 函数中,我们创建了三个 fetch_data 协程,每个都有不同的等待时间和数据 ID。然后使用 asyncio.gather 一次性启动所有协程任务。asyncio.gather 会等待传递给它的所有协程完成,并返回它们的结果(按传递给 asyncio.gather 的顺序)。

假设每个 fetch_data 的执行耗时都不一样,因为 asyncio.gather 运行它们是并发执行的,所以总的执行时间将是三个协程中耗时最长的那个,而不是它们的总和。

运行上述代码,你将看到所有的 fetch_data 几乎同时开始,但按照它们各自的耗时依次完成。当所有协程都执行完毕后,main 函数打印出所有数据已获取的信息和结果列表。

asyncio.get_event_loop()

在Python 3.7之前,asyncio.get_event_loop()方法经常用来获取当前上下文的事件循环。从Python 3.7开始,官方文档推荐使用asyncio.run()来运行最顶层的协程,并且在协程中通常不需要直接与事件循环交互。然而,在某些特定情况下或者是Python 3.7之前的版本,如果需要显式访问事件循环,asyncio.get_event_loop()仍然可以使用。

以下是一个例子,展示了在Python 3.7之前如何使用asyncio.get_event_loop()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def periodic():
while True:
print('Hello every two seconds')
await asyncio.sleep(2)

loop = asyncio.get_event_loop() # 获取当前上下文中的事件循环
try:
asyncio.ensure_future(periodic()) # 将协程包装成Task并安排其在事件循环中运行
loop.run_forever() # 运行事件循环,直到stop()被调用。事件循环会持续运行周期性的periodic()函数
except KeyboardInterrupt:
pass
finally:
loop.close() # 关闭事件循环

在上面的代码中,首先通过asyncio.get_event_loop()获取了当前上下文的事件循环。接着用asyncio.ensure_future()periodic这个协程函数封装为Task并安排在事件循环中运行。run_forever()方法开始无限循环地运行事件循环,知道loop.stop()被调用。当想要中止程序时(例如通过KeyboardInterrupt,常见于按下Ctrl+C),事件循环通过调用finally语句块中的loop.close()来停止并关闭。

从Python 3.7及以后版本,你可以简化上面的代码,使用asyncio.run()代替:

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio

async def periodic():
while True:
print('Hello every two seconds')
await asyncio.sleep(2)

async def main():
task = asyncio.create_task(periodic())
await task # 这个例子中,这行实际上永远不会结束,因为periodic是无限循环的。

# 直接运行main协程方法而不需要获取事件循环
asyncio.run(main())

在此重构后的代码中,我们定义了一个main异步函数,然后使用asyncio.run(main())启动事件循环并执行main。这样就避免了直接与事件循环API交互的需要。

asyncio.create_task()

是用来调度执行一个协程任务(coroutine)的函数。当你有一个协程函数,且你想要它在后台并发(asynchronously)运行时,你可以通过 asyncio.create_task() 函数来创建一个任务(Task)。

任务是对协程的一种包装,它允许协程在事件循环中并发执行。当一个任务被创建时,它将会被立即调度执行,而不需要等待手动的挂起操作(例如使用 await 关键字)。这使得你可以在不阻塞当前执行的协程的情况下启动新任务。

下面是使用 asyncio.create_task() 的一个简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def some_coroutine_function():
print('Do something asynchronously')
await asyncio.sleep(1) # 模拟I/O操作
print('Done')

async def main():
# 创建并开始一个任务,这个任务会运行 some_coroutine_function 协程
task = asyncio.create_task(some_coroutine_function())

# 此处可以执行其他代码,任务将在后台并发运行
print('Other code running concurrently')

# 稍后,也许在某个特定的时刻,你可能需要等待任务完成
await task

# 运行main函数来执行协程
asyncio.run(main())

在上述代码中,some_coroutine_function 是一个协程函数。在 main 函数中,我们调用 asyncio.create_task() 来创建一个任务,这个任务包装了 some_coroutine_function 协程。some_coroutine_function 将会并发执行,这意味着 main 功能中 print('Other code running concurrently') 的执行不会被延迟。我们还可以在需要的时候通过 await task 来等待任务完成。

asyncio.create_task() 是一个很方便的方式来实现代码的并行执行,特别是当你有多个协程需要运行,并且不希望它们相互阻塞时,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio

# 定义第一个异步任务函数
async def task1():
for i in range(3):
print('Task 1 - Iteration {}'.format(i))
await asyncio.sleep(1)

# 定义第二个异步任务函数
async def task2():
for i in range(3):
print('Task 2 - Iteration {}'.format(i))
await asyncio.sleep(1)

# 定义主函数,它将创建并运行上面定义的两个任务
async def main():
# 创建任务但不等待它完成
task_1 = asyncio.create_task(task1())
task_2 = asyncio.create_task(task2())

# 等待任务1完成
await task_1
# 等待任务2完成
await task_2

# 使用asyncio.run()来运行主函数,并启动所有协程
asyncio.run(main())

在上述代码中:

  • task1()task2() 都是异步函数,它们使用await asyncio.sleep(1)模拟异步操作,例如I/O操作,使得每个迭代之间有1秒的延迟。
  • main()内部,我们通过asyncio.create_task()函数创建了两个任务,并立刻启动它们。这两个任务会在事件循环中并发(同时)执行。
  • 最后我们使用await,在main()函数中等待每个任务完成。请注意,在任务创建后,它们已经立即开始执行了,即使我们在main()后面的代码还没有到达await

在运行这段代码时,你会注意到”Task 1”和”Task 2”的输出是交错的,因为两个任务是同时运行的,它们各自的迭代不会相互阻塞。

asyncio.gather()

asyncio.gather() 是一个用来并发运行多个协程的函数,并且等待所有协程完成。这个函数接受一系列的协程或future对象,然后同时(concurrently)运行它们。当所有的协程均运行完毕后,gather() 会收集并返回所有协程的结果。

利用 asyncio.gather() 来重写上面的例子代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

async def task1():
for i in range(3):
print('Task 1 - Iteration {}'.format(i))
await asyncio.sleep(1)

async def task2():
for i in range(3):
print('Task 2 - Iteration {}'.format(i))
await asyncio.sleep(1)

async def main():
# 使用 asyncio.gather() 来并发运行两个协程
await asyncio.gather(
task1(),
task2()
)

# 使用 asyncio.run() 运行主函数
asyncio.run(main())

这段代码与前面使用 asyncio.create_task() 的例子相比,最大的不同在于这里我们直接将两个协程传递给了 asyncio.gather()。因此,我们不需要显式创建任务,asyncio.gather() 内部会处理这些。

此种方式也会使得”Task 1”和”Task 2”的输出是交错的,因为task1()task2()协程会被并发执行。

asyncio.gather() 不仅能异步执行协程,还能等待所有协程完成并且可以获取每个协程返回的结果。这在你需要收集多个协程返回值时特别有用

asyncio另外比较重要的函数

asyncio 模块提供了许多有用的函数和特性来编写异步代码。除了 asyncio.create_task()asyncio.gather() 之外,以下是一些其他重要的 asyncio 函数:

  1. asyncio.run(coro): 这是启动高层级异步程序的主入口点。它运行传入的协程(coroutine),负责管理事件循环,最后关闭事件循环。从 Python 3.7 版本开始引入。
  2. asyncio.sleep(delay, result=None): 休眠指定的秒数。result 参数是在休眠结束时返回的值,通常用于模拟 IO 绑定操作或者简单的进行时间延迟。
  3. asyncio.wait(fs, *, timeout=None, return_when=ALL_COMPLETED): 并发运行传入的协程或 Future 实例(fs),直到满足指定条件。return_when 参数可以自定义为 FIRST_COMPLETED, FIRST_EXCEPTION, 或 ALL_COMPLETED 等。
  4. asyncio.wait_for(fut, timeout, *, loop=None): 对一个协程或 future 实例设置超时时间。如果超出设定时间,任务将被取消。
  5. asyncio.as_completed(fs, *, timeout=None, loop=None): 用于异步迭代传入的协程或 future 实例,一旦其中一个完成了,它就会被 yield 出来。
  6. asyncio.get_event_loop(): 获取当前上下文的事件循环。
  7. asyncio.set_event_loop(loop): 设置当前上下文的事件循环。
  8. asyncio.open_connection(host=None, port=None, , loop=None, limit=None, kwds): 打开 TCP 连接并回传一个二元组 (reader, writer),这可以用来进行非阻塞的网络通信。
  9. asyncio.run_coroutine_threadsafe(coro, loop): 安排在由 loop 指定的事件循环中运行 coro,并且适合于从不同线程调用(线程安全)。返回一个 concurrent.futures.Future,可以在其他线程中等待协程的结果。
  10. asyncio.Lock/asyncio.Event/asyncio.Condition/asyncio.Semaphore: 这些是 asyncio 版的同步原语(synchronization primitives) 类,可以用来在不同协程中进行同步。

以下是一个使用了 asyncio.waitasyncio.as_completed 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio

async def my_coroutine(i):
await asyncio.sleep(i)
return f"coroutine {i} has finished"

async def main():
# 创建一个协程列表
coroutines = [my_coroutine(i) for i in range(1, 4)]

# 使用 asyncio.wait 并发运行,并等待所有完成
done, pending = await asyncio.wait(coroutines)
for task in done:
print(task.result())

# 使用 asyncio.as_completed 获取每个协程的完成顺序
for coro in asyncio.as_completed(coroutines):
result = await coro
print(result)

# 运行 main 函数
asyncio.run(main())

这个例子首先展示了如何使用 asyncio.wait 并发运行协程并等待所有完成。接着,示例展示了如何使用 asyncio.as_completed 按完成顺序获取协程结果。

python的协程坑很多

在协程中用到aiohttp去获取数据,然后分析,分析是耗时的,这有可能导致aiohttp自动超时!

在异步编程中,特别是在使用asyncio时,所有的协程都是在一个线程的事件循环上运行的。这个事件循环负责调度和执行所有的协程。当一个协程正在运行时,它必须在某个点上让步给其它协程,使得它们也有机会执行。这通常是通过await表达式实现的,此时事件循环可以挂起当前协程,并恢复另一个。

如果一个协程中有一个长时间运行的非异步操作,比如复杂的同步数据处理或计算,它将保持对当前线程的占用,因为它没有暂停或让出控制权给事件循环。在这一长期操作完成前,不会有其他任何协程得到执行——包括网络I/O操作。这意味着,如果你有一个执行http请求的协程等待被处理,它将被推迟,直到耗时的协程完成其操作并暂停或结束。

因此,如果你在请求响应到来后立即进行一个耗时的操作,并且你正在等待的另一个异步http请求(它依赖于事件循环来检测什么时候接收到响应数据),由于事件循环正忙于运行耗时的任务,请求可能会超时,因为它没有足够快地重新获得事件循环的控制权来处理响应。

要避免这种情况,你可以将耗时的处理操作移出关键的异步路径,比如使用异步的方式来处理数据、把它们放入一个队列供另一个协程处理,或者使用 asyncio.run_in_executor 将它们移入另一个线程或进程。这样就使事件循环可以继续处理其他协程,比如发起和接收http请求,从而避免了超时问题。


Python 协程
https://luffy997.github.io/2025/04/30/Python-协程/
作者
Luffy997
发布于
2025年4月30日
许可协议