Python 协程
介绍
Python 中的协程是通过 async
和 await
关键字实现的,它们使你能够编写并发代码,就像编写普通的同步代码一样。以下是一个简单的示例,说明如何使用协程来异步执行任务。
首先,确保你有 Python 3.5 或更高的版本,因为 async
和 await
是从 Python 3.5 开始引入的。
基本用法
单个任务
1 |
|
在这个示例中,fetch_data
函数模拟了一个需要 3 秒钟完成的数据获取任务,它使用 asyncio.sleep
来异步等待,而不会阻塞整个事件循环。process_data
函数模拟对获取的数据进行处理,耗时 2 秒。
最后,在 main
函数中,我们使用 await
来调用这些异步函数,这样可以一步一步地执行它们,asyncio.run(main())
用于启动事件循环并运行 main
函数。
输出结果如下,展示了程序的执行顺序:
1 |
|
此例子展示了协程的基本用法,但实际上协程的威力在于能够使多个 IO 密集型任务并发运行,大大提高程序效率。例如,你可以创建多个fetch_data
任务并同时运行它们,然后等待所有结果准备就绪,从而充分利用等待时间。
多个任务
1 |
|
在这段代码中,fetch_data
函数被修改,增加了一个 data_id
参数,以便区分不同的任务。main
函数中,我们创建了三个 fetch_data
协程,每个都有不同的等待时间和数据 ID。然后使用 asyncio.gather
一次性启动所有协程任务。asyncio.gather
会等待传递给它的所有协程完成,并返回它们的结果(按传递给 asyncio.gather
的顺序)。
假设每个 fetch_data
的执行耗时都不一样,因为 asyncio.gather
运行它们是并发执行的,所以总的执行时间将是三个协程中耗时最长的那个,而不是它们的总和。
运行上述代码,你将看到所有的 fetch_data
几乎同时开始,但按照它们各自的耗时依次完成。当所有协程都执行完毕后,main
函数打印出所有数据已获取的信息和结果列表。
asyncio.get_event_loop()
在Python 3.7之前,asyncio.get_event_loop()
方法经常用来获取当前上下文的事件循环。从Python 3.7开始,官方文档推荐使用asyncio.run()
来运行最顶层的协程,并且在协程中通常不需要直接与事件循环交互。然而,在某些特定情况下或者是Python 3.7之前的版本,如果需要显式访问事件循环,asyncio.get_event_loop()
仍然可以使用。
以下是一个例子,展示了在Python 3.7之前如何使用asyncio.get_event_loop()
:
1 |
|
在上面的代码中,首先通过asyncio.get_event_loop()
获取了当前上下文的事件循环。接着用asyncio.ensure_future()
将periodic
这个协程函数封装为Task
并安排在事件循环中运行。run_forever()
方法开始无限循环地运行事件循环,知道loop.stop()
被调用。当想要中止程序时(例如通过KeyboardInterrupt
,常见于按下Ctrl+C
),事件循环通过调用finally
语句块中的loop.close()
来停止并关闭。
从Python 3.7及以后版本,你可以简化上面的代码,使用asyncio.run()
代替:
1 |
|
在此重构后的代码中,我们定义了一个main
异步函数,然后使用asyncio.run(main())
启动事件循环并执行main
。这样就避免了直接与事件循环API交互的需要。
asyncio.create_task()
是用来调度执行一个协程任务(coroutine)的函数。当你有一个协程函数,且你想要它在后台并发(asynchronously)运行时,你可以通过 asyncio.create_task()
函数来创建一个任务(Task)。
任务是对协程的一种包装,它允许协程在事件循环中并发执行。当一个任务被创建时,它将会被立即调度执行,而不需要等待手动的挂起操作(例如使用 await
关键字)。这使得你可以在不阻塞当前执行的协程的情况下启动新任务。
下面是使用 asyncio.create_task()
的一个简单示例:
1 |
|
在上述代码中,some_coroutine_function
是一个协程函数。在 main
函数中,我们调用 asyncio.create_task()
来创建一个任务,这个任务包装了 some_coroutine_function
协程。some_coroutine_function
将会并发执行,这意味着 main
功能中 print('Other code running concurrently')
的执行不会被延迟。我们还可以在需要的时候通过 await task
来等待任务完成。
asyncio.create_task()
是一个很方便的方式来实现代码的并行执行,特别是当你有多个协程需要运行,并且不希望它们相互阻塞时,如下:
1 |
|
在上述代码中:
task1()
和task2()
都是异步函数,它们使用await asyncio.sleep(1)
模拟异步操作,例如I/O操作,使得每个迭代之间有1秒的延迟。- 在
main()
内部,我们通过asyncio.create_task()
函数创建了两个任务,并立刻启动它们。这两个任务会在事件循环中并发(同时)执行。 - 最后我们使用
await
,在main()
函数中等待每个任务完成。请注意,在任务创建后,它们已经立即开始执行了,即使我们在main()
后面的代码还没有到达await
。
在运行这段代码时,你会注意到”Task 1”和”Task 2”的输出是交错的,因为两个任务是同时运行的,它们各自的迭代不会相互阻塞。
asyncio.gather()
asyncio.gather()
是一个用来并发运行多个协程的函数,并且等待所有协程完成。这个函数接受一系列的协程或future对象,然后同时(concurrently)运行它们。当所有的协程均运行完毕后,gather()
会收集并返回所有协程的结果。
利用 asyncio.gather()
来重写上面的例子代码如下:
1 |
|
这段代码与前面使用 asyncio.create_task()
的例子相比,最大的不同在于这里我们直接将两个协程传递给了 asyncio.gather()
。因此,我们不需要显式创建任务,asyncio.gather()
内部会处理这些。
此种方式也会使得”Task 1”和”Task 2”的输出是交错的,因为task1()
和task2()
协程会被并发执行。
asyncio.gather()
不仅能异步执行协程,还能等待所有协程完成并且可以获取每个协程返回的结果。这在你需要收集多个协程返回值时特别有用。
asyncio另外比较重要的函数
asyncio
模块提供了许多有用的函数和特性来编写异步代码。除了 asyncio.create_task()
和 asyncio.gather()
之外,以下是一些其他重要的 asyncio
函数:
- asyncio.run(coro): 这是启动高层级异步程序的主入口点。它运行传入的协程(coroutine),负责管理事件循环,最后关闭事件循环。从 Python 3.7 版本开始引入。
- asyncio.sleep(delay, result=None): 休眠指定的秒数。
result
参数是在休眠结束时返回的值,通常用于模拟 IO 绑定操作或者简单的进行时间延迟。 - asyncio.wait(fs, *, timeout=None, return_when=ALL_COMPLETED): 并发运行传入的协程或 Future 实例(fs),直到满足指定条件。
return_when
参数可以自定义为FIRST_COMPLETED
,FIRST_EXCEPTION
, 或ALL_COMPLETED
等。 - asyncio.wait_for(fut, timeout, *, loop=None): 对一个协程或 future 实例设置超时时间。如果超出设定时间,任务将被取消。
- asyncio.as_completed(fs, *, timeout=None, loop=None): 用于异步迭代传入的协程或 future 实例,一旦其中一个完成了,它就会被 yield 出来。
- asyncio.get_event_loop(): 获取当前上下文的事件循环。
- asyncio.set_event_loop(loop): 设置当前上下文的事件循环。
- asyncio.open_connection(host=None, port=None, , loop=None, limit=None, kwds): 打开 TCP 连接并回传一个二元组
(reader, writer)
,这可以用来进行非阻塞的网络通信。 - asyncio.run_coroutine_threadsafe(coro, loop): 安排在由
loop
指定的事件循环中运行coro
,并且适合于从不同线程调用(线程安全)。返回一个concurrent.futures.Future
,可以在其他线程中等待协程的结果。 - asyncio.Lock/asyncio.Event/asyncio.Condition/asyncio.Semaphore: 这些是 asyncio 版的同步原语(synchronization primitives) 类,可以用来在不同协程中进行同步。
以下是一个使用了 asyncio.wait
和 asyncio.as_completed
的例子:
1 |
|
这个例子首先展示了如何使用 asyncio.wait
并发运行协程并等待所有完成。接着,示例展示了如何使用 asyncio.as_completed
按完成顺序获取协程结果。
python的协程坑很多
在协程中用到aiohttp去获取数据,然后分析,分析是耗时的,这有可能导致aiohttp自动超时!
在异步编程中,特别是在使用asyncio时,所有的协程都是在一个线程的事件循环上运行的。这个事件循环负责调度和执行所有的协程。当一个协程正在运行时,它必须在某个点上让步给其它协程,使得它们也有机会执行。这通常是通过await表达式实现的,此时事件循环可以挂起当前协程,并恢复另一个。
如果一个协程中有一个长时间运行的非异步操作,比如复杂的同步数据处理或计算,它将保持对当前线程的占用,因为它没有暂停或让出控制权给事件循环。在这一长期操作完成前,不会有其他任何协程得到执行——包括网络I/O操作。这意味着,如果你有一个执行http请求的协程等待被处理,它将被推迟,直到耗时的协程完成其操作并暂停或结束。
因此,如果你在请求响应到来后立即进行一个耗时的操作,并且你正在等待的另一个异步http请求(它依赖于事件循环来检测什么时候接收到响应数据),由于事件循环正忙于运行耗时的任务,请求可能会超时,因为它没有足够快地重新获得事件循环的控制权来处理响应。
要避免这种情况,你可以将耗时的处理操作移出关键的异步路径,比如使用异步的方式来处理数据、把它们放入一个队列供另一个协程处理,或者使用 asyncio.run_in_executor 将它们移入另一个线程或进程。这样就使事件循环可以继续处理其他协程,比如发起和接收http请求,从而避免了超时问题。