asyncio
asyncio 简介
asyncio是Python 3.4 试验性引入的异步I/O框架(PEP 3156),提供了基于协程做异步I/O编写单线程并发代码的基础设施。其核心组件有事件循环(Event Loop)、协程(Coroutine)、任务(Task)、未来对象(Future)以及其他一些扩充和辅助性质的模块。
在引入asyncio的时候,还提供了一个装饰器@asyncio.coroutine用于装饰使用了yield from的函数,以标记其为协程。但并不强制使用这个装饰器。
虽然发展到 Python 3.4 时有了yield from的加持让协程更容易了,但是由于协程在Python中发展的历史包袱所致,很多人仍然弄不明白生成器和协程的联系与区别,也弄不明白yield 和 yield from 的区别。这种混乱的状态也违背Python之禅的一些准则。
于是Python设计者们又快马加鞭地在 3.5 中新增了async/await语法(PEP 492),对协程有了明确而显式的支持,称之为原生协程。async/await 和 yield from这两种风格的协程底层复用共同的实现,而且相互兼容。
在Python 3.6 中asyncio库“转正”,不再是实验性质的,成为标准库的正式一员。
asyncio重写爬虫
用 asyncio 重写一下之前写的例子:
import asyncio
import aiohttp
host = 'http://example.com'
urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
async def fetch(url):
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url) as response:
response = await response.read()
return response
if __name__ == '__main__':
import time
start = time.time()
loop = asyncio.get_event_loop()
tasks = [fetch(host + url) for url in urls_todo]
loop.run_until_complete(asyncio.gather(*tasks))
print(time.time() - start)
上述代码运行时间大概为0.36s
对比起之前的,变化很大:
- 没有了yield 或 yield from,而是async/await
- 没有了自造的loop(),取而代之的是asyncio.get_event_loop()
- 无需自己在socket上做异步操作,不用显式地注册和注销事件,aiohttp库已经代劳
- 没有了显式的 Future 和 Task,asyncio已封装
- 更少量的代码,更优雅的设计
和同步阻塞版的代码对比:
- 异步化
- 代码量相当(引入aiohttp框架后更少)
- 代码逻辑同样简单,跟同步代码一样的结构、一样的逻辑
- 接近10倍的性能提升
简单实现 asyncio
asyncio逻辑梳理
我们将之前的实现异步的代码抽取出来,主要就有 Future, Task, event_loop这三个东西。
from selectors import EVENT_READ, EVENT_WRITE, DefaultSelector
stopped = False
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
def __iter__(self):
yield self
return self.result
class Task:
def __init__(self, coro):
self.coro = coro
f = Future()
f.set_result(None)
self.step(f) #激活 Task 包裹的生成器
def step(self, future):
try:
next_future = self.coro.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
selector = DefaultSelector()
def loop():
while not stopped:
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback()
这个代码当然是不能直接使用的,我们需要改造一下。
asyncio库里面的核心也是这三个东西,Future, Task 还有事件循环。
在开始之前,我们先使用 asyncio来写个小demo,根据这个demo来梳理一下asyncio的逻辑。
import asyncio
async def get_html(url):
print("开始获取HTML")
await asyncio.sleep(2)
print("结束获取HTML")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
func = get_html("https://www.baidu.com")
task = loop.create_task(func)
loop.run_until_complete(task)
在上述代码中,我们构建了一个异步函数 get_html 来模拟访问网页的过程。
在这个代码段中,首先执行的就是 get_event_loop 这个函数,这个函数的作用就是获取一个事件循环,不停的循环检测是否有事件准备好,如果检查到准备好的,就调用注册在事件上的回调函数,直到 stopped 置位时退出循环。 get_event_loop() 会返回一个事件循环类的实例,这个类继承于 BaseEventLoop。
之后执行了 create_task() 这个函数,来看看它都干嘛了。
在asyncio/base_events.py中找到BaseEventLoop定义,在BaseEventLoop类中有个create_task方法。
这个方法的核心代码就只有这俩。
def create_task(self, coro):
task = tasks.Task(coro, loop=self)
return task
就是创建了一个Task的实例,然后将传入的协程 coro 通过 Task 这个类中的逻辑进行一步一步驱动。
每个 Task 类实例都会包裹一个协程(coro),然后通过函数 step 中的 send 来驱动协程。这个协程经过 future 一步一步驱动起来。
在 Task 类中,初始化实例的时候将协程保存,然后通过 step 函数来启动协程。但是这里的 step 函数和上面我们抽取出来的不太一样,它多了很多参数检查和异常处理,然后就是,我们上面抽取出来的 Task 类在 init 的时候,就将整个协程驱动起来,但是在 asyncio 中,它不会马上调用 step 函数,而是在下一帧(下一次循环)中调用(_loop.call_soon 函数)。
这里调用 call_soon 函数就是将 Task 实例的 _step 函数添加到待执行的队列中去,这个函数也是定义在 asyncio/base_events.py 的 BaseEventLoop 类中的。
def _call_soon(self, callback, args):
handle = events.Handle(callback, args, self)
if handle._source_traceback:
del handle._source_traceback[-1]
self._ready.append(handle) # 事件添加到队列
return handle
它返回了一个 Handle 类的实例。这里的 Handle 类就是包裹了就绪事件的回调函数的,其中定义了一个run方法,就是直接执行回调函数,self._ready 保存着 Handle 类的实例,在 asyncio 中 loop 死循环不断检测是否有事件就绪,即检测 self._ready是否有为空,不为空就从其中弹出 Handle 实例,然后调用handle实例的run方法,其实就是执行注册在就绪事件上的回调函数。一旦有就绪事件,就调用其回调函数。
现在在我们写的那个小 demo 中,已经通过 task = loop.create_task(func) 创建了一个 task 实例,该 task 实例包裹了我们自己定义的协程 func ,并且 在task 初始化的时候在 __init__
函数中通过 call_soon
通知下一次循环立即执行 task 的_step
函数来激活cora协程。接下来就是run_until_complete
函数了。
run_until_complete
函数同样定义在asyncio/base_events.py的BaseEventLoop类中。这个函数中就有 loop 的死循环。(节选的代码,删除了部分代码)
def run_until_complete(self, future):
future = tasks.ensure_future(future, loop=self) # ensure_future,即,确保是future。返回的是future(task也是future)
future.add_done_callback(_run_until_complete_cb) # 用来结束循环
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')
return future.result()
函数首先确保传递进来的参数是future,Task 是继承 Future的,所以 task 也是 future。我们外面传进来的参数是个task实例,所以这个函数调用返回的其实就是本身(传进去是啥返回就是啥),然后给我们传进来的task实例通过调用add_done_callback添加_run_until_complete_cb回调函数,这个回调函数比较关键,run_until_complete的做的最重要的事就是给传进来的task实例添加这个回调,点进_run_until_complete_cb,可以看到就是调用了loop的stop函数,这个的意义就是,当我们传进来的task包裹的协程运行结束后,就调用这个回调,跳出循环(就是相当于我们抽取出来的代码中的stopped变量的作用),否则死循环就真的是死循环了,永远跳不出。
之后就是真的死循环,run forever。
关键代码
def run_forever(self):try:
events._set_running_loop(self)
while True:
self._run_once()
if self._stopping:
break
finally:
...
这个函数不断的调用_run_once(),就像我们抽取出来的loop函数中不断地调用下面这段代码:
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback()
而在 _run_once()中:
else:
event_list = self._selector.select(timeout) # 筛选就绪事件,将其回调添加到self._ready中
self._process_events(event_list) # 该函数具体实现在selector_events.py中
这里也就是选出就绪事件,然后添加到self._ready队列中,随后执行。在_run_once()的尾部,我们看到如下代码:
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.
这里就是调用就绪事件的回调函数的执行。先看_ready队列中是否有待处理的Handle实例,如果有,那就一个一个执行,handle中的_run()方法就是执行就绪事件的回调函数。至此,就把我们抽取出来的中的loop()函数的逻辑对应到了asyncio源码的循环之中。
最后来看看Future
正如我们上面抽取的代码中的Future:
def __iter__(self):
yield self
return self.result # 在Task.step中send(result)的时候再次调用这个生成器,但是此时会抛出stopInteration异常,并且把self.result返回
yield的出现使得iter函数变成一个生成器,生成器本身就有next方法,所以不需要额外实现。yield from x语句首先调用iter(x)获取一个迭代器(生成器也是迭代器)。
这里的future和asyncio中的future,结构是一样的,功能也类似。最后运行起来的时候就是生成器一层嵌套一层。
总结
以上介绍了Python异步编程和自己简单实现一个asyncio。如果还有不太理解,或者是觉得文章在某些地方还有需要提升的地方,以及有不同观点的地方,欢迎在下面留言交流,帮助我一起把这篇文章变得更好。
欢迎转载,但请注明出处。