python异步IO(asyncio)协程实战
1.概念描述:
asyncio 是用来编写并发代码的库,使用async/await语法。
asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择
event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
future: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别
async定义一个协程,await用于挂起阻塞的异步调用接口。
2.定义一个协程并实例化
async def do_some_work(x): # 定义一个协程
print('Waiting: ', x)
return 'Done after {}s'.format(x)
coroutine = do_some_work(2) # 新建一个协程实例
3.创建一个task
task = asyncio.ensure_future(coroutine)
task = loop.create_task(coroutine)
asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task
4.定义并绑定回调
# def callback(future): # 定义一个回调函数,不传参数
# print('Callback: ', future.result()) # 打印协程的返回结果
def callback(t, future): # 定义回调函数,接收一个参数和协程实例对象
print('Callback:', t, future.result()) # 传入的参数和协程实例的执行结果
#task.add_done_callback(callback) # 添加并绑定回调函数
task.add_done_callback(functools.partial(callback, 2)) # 添加回调函数并在回调函数中传入参数
程序执行实例:
import time
import asyncio
import functools
now = lambda: time.time()
async def do_some_work(x): # 定义一个协程
print('Waiting: ', x)
return 'Done after {}s'.format(x)
# def callback(future): # 定义一个回调函数,不传参数
# print('Callback: ', future.result()) # 打印协程的返回结果
def callback(t, future): # 定义回调函数,接收一个参数和协程实例对象
print('Callback:', t, future.result()) # 传入的参数和协程实例的执行结果
start = now()
coroutine = do_some_work(2) # 新建一个协程实例
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine) # asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task
#task.add_done_callback(callback) # 添加回调函数
task.add_done_callback(functools.partial(callback, 2)) # 添加回调函数并在回调函数中传入参数
loop.run_until_complete(task) # 将协程实例注册到事件循环,并启动事件循环
print('Task ret: {}'.format(task.result())) # 获取协程实例运行的返回结果
print('TIME: ', now() - start)
5.阻塞和await
使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权
协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行
耗时的操作一般是一些IO操作,例如网络请求,文件读取等。
我们使用asyncio.sleep函数来模拟IO操作,协程的目的也是让这些IO操作异步化。
程序示例:
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('run in task1: ', x)
await asyncio.sleep(x) # 协程遇到await,事件循环将会挂起该协程,执行别的协程work2,sleep 模拟耗时操作
print("run in task1 end")
return 'Done after {}s'.format(x)
async def do_some_work2(x):
print('run in task2: ', x)
return 'Done2 after {}s'.format(x)
start = now()
coroutine = do_some_work(1)
coroutine2 = do_some_work2(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task2 = asyncio.ensure_future(coroutine2)
loop.run_until_complete(task)
loop.run_until_complete(task2)
print('Task result: ', task.result())
print('Task result: ', task2.result())
print('TIME: ', now() - start)
执行结果:
run in task1: 2
run in task2: 2
run in task1 end
Task ret: Done after 2s
Task ret: Done2 after 2s
TIME: 2.0031144618988037
6.asyncio实现并发
asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。创建多个协程的列表,然后将这些协程注册到事件循环中
程序示例:
import time
import asyncio
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
start = now()
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task ret: ', task.result())
print('TIME: ', now() - start)
执行结果:
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
TIME: 4.006228923797607
结论分析:总时间为4s左右。4s的阻塞时间,足够前面两个协程执行完毕。如果是同步顺序的任务,那么至少需要7s,所有本程序中我们使用了aysncio实现了并发
7.协程嵌套
import asyncio
import time
import asyncio
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('TIME: ', now() - start)
执行结果:
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 2s
Task ret: Done after 4s
Task ret: Done after 1s
TIME: 3.9912283420562744
8.协程停止
上面见识了协程的几种常用的用法,都是协程围绕着事件循环进行的操作。future对象有几个状态:
- Pending
- Running
- Done
- Cancelled
创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消Cancelled
程序实例:
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
start = now()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
for task in asyncio.Task.all_tasks():
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
print('TIME: ', now() - start)
启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt,然后通过循环asyncio.Task取消future
执行结果: