简单理解python异步编程与asyncio实现(四)

asyncio

asyncio 简介

asyncio是Python 3.4 试验性引入的异步I/O框架(PEP 3156),提供了基于协程做异步I/O编写单线程并发代码的基础设施。其核心组件有事件循环(Event Loop)、协程(Coroutine)、任务(Task)、未来对象(Future)以及其他一些扩充和辅助性质的模块。

在引入asyncio的时候,还提供了一个装饰器@asyncio.coroutine用于装饰使用了yield from的函数,以标记其为协程。但并不强制使用这个装饰器。

虽然发展到 Python 3.4 时有了yield from的加持让协程更容易了,但是由于协程在Python中发展的历史包袱所致,很多人仍然弄不明白生成器和协程的联系与区别,也弄不明白yield 和 yield from 的区别。这种混乱的状态也违背Python之禅的一些准则。

于是Python设计者们又快马加鞭地在 3.5 中新增了async/await语法(PEP 492),对协程有了明确而显式的支持,称之为原生协程。async/await 和 yield from这两种风格的协程底层复用共同的实现,而且相互兼容。

在Python 3.6 中asyncio库“转正”,不再是实验性质的,成为标准库的正式一员。

asyncio重写爬虫

用 asyncio 重写一下之前写的例子:

import asyncio
import aiohttp

host = 'http://example.com'
urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}

async def fetch(url):
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(url) as response:
            response = await response.read()
            return response

if __name__ == '__main__':
    import time
    start = time.time()
    loop = asyncio.get_event_loop()
    tasks = [fetch(host + url) for url in urls_todo]
    loop.run_until_complete(asyncio.gather(*tasks))
    print(time.time() - start)

上述代码运行时间大概为0.36s

对比起之前的,变化很大:

和同步阻塞版的代码对比:

简单实现 asyncio

asyncio逻辑梳理

我们将之前的实现异步的代码抽取出来,主要就有 Future, Task, event_loop这三个东西。

from selectors import EVENT_READ, EVENT_WRITE, DefaultSelector

stopped = False

class Future:

    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

    def __iter__(self):
        yield self
        return self.result

class Task:

    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)  #激活 Task 包裹的生成器

    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)

selector = DefaultSelector()

def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

这个代码当然是不能直接使用的,我们需要改造一下。

asyncio库里面的核心也是这三个东西,Future, Task 还有事件循环。

在开始之前,我们先使用 asyncio来写个小demo,根据这个demo来梳理一下asyncio的逻辑。

import asyncio

async def get_html(url):
    print("开始获取HTML")
    await asyncio.sleep(2)
    print("结束获取HTML")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    func = get_html("https://www.baidu.com")
    task = loop.create_task(func)
    loop.run_until_complete(task)

在上述代码中,我们构建了一个异步函数 get_html 来模拟访问网页的过程。

在这个代码段中,首先执行的就是 get_event_loop 这个函数,这个函数的作用就是获取一个事件循环,不停的循环检测是否有事件准备好,如果检查到准备好的,就调用注册在事件上的回调函数,直到 stopped 置位时退出循环。 get_event_loop() 会返回一个事件循环类的实例,这个类继承于 BaseEventLoop。

之后执行了 create_task() 这个函数,来看看它都干嘛了。

在asyncio/base_events.py中找到BaseEventLoop定义,在BaseEventLoop类中有个create_task方法。

这个方法的核心代码就只有这俩。

def create_task(self, coro):  
        task = tasks.Task(coro, loop=self)
        return task

就是创建了一个Task的实例,然后将传入的协程 coro 通过 Task 这个类中的逻辑进行一步一步驱动。

每个 Task 类实例都会包裹一个协程(coro),然后通过函数 step 中的 send 来驱动协程。这个协程经过 future 一步一步驱动起来。

过程图

在 Task 类中,初始化实例的时候将协程保存,然后通过 step 函数来启动协程。但是这里的 step 函数和上面我们抽取出来的不太一样,它多了很多参数检查和异常处理,然后就是,我们上面抽取出来的 Task 类在 init 的时候,就将整个协程驱动起来,但是在 asyncio 中,它不会马上调用 step 函数,而是在下一帧(下一次循环)中调用(_loop.call_soon 函数)。

这里调用 call_soon 函数就是将 Task 实例的 _step 函数添加到待执行的队列中去,这个函数也是定义在 asyncio/base_events.py 的 BaseEventLoop 类中的。

    def _call_soon(self, callback, args):
        handle = events.Handle(callback, args, self)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)  # 事件添加到队列
        return handle

它返回了一个 Handle 类的实例。这里的 Handle 类就是包裹了就绪事件的回调函数的,其中定义了一个run方法,就是直接执行回调函数,self._ready 保存着 Handle 类的实例,在 asyncio 中 loop 死循环不断检测是否有事件就绪,即检测 self._ready是否有为空,不为空就从其中弹出 Handle 实例,然后调用handle实例的run方法,其实就是执行注册在就绪事件上的回调函数。一旦有就绪事件,就调用其回调函数。

现在在我们写的那个小 demo 中,已经通过 task = loop.create_task(func) 创建了一个 task 实例,该 task 实例包裹了我们自己定义的协程 func ,并且 在task 初始化的时候在 __init__ 函数中通过 call_soon 通知下一次循环立即执行 task 的_step函数来激活cora协程。接下来就是run_until_complete函数了。

run_until_complete函数同样定义在asyncio/base_events.py的BaseEventLoop类中。这个函数中就有 loop 的死循环。(节选的代码,删除了部分代码)

def run_until_complete(self, future):
        future = tasks.ensure_future(future, loop=self)  # ensure_future,即,确保是future。返回的是future(task也是future)

        future.add_done_callback(_run_until_complete_cb)  # 用来结束循环
        try:
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                future.exception()
            raise
        finally:
            future.remove_done_callback(_run_until_complete_cb)
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')

        return future.result()

函数首先确保传递进来的参数是future,Task 是继承 Future的,所以 task 也是 future。我们外面传进来的参数是个task实例,所以这个函数调用返回的其实就是本身(传进去是啥返回就是啥),然后给我们传进来的task实例通过调用add_done_callback添加_run_until_complete_cb回调函数,这个回调函数比较关键,run_until_complete的做的最重要的事就是给传进来的task实例添加这个回调,点进_run_until_complete_cb,可以看到就是调用了loop的stop函数,这个的意义就是,当我们传进来的task包裹的协程运行结束后,就调用这个回调,跳出循环(就是相当于我们抽取出来的代码中的stopped变量的作用),否则死循环就真的是死循环了,永远跳不出。

之后就是真的死循环,run forever。

关键代码

def run_forever(self):try:
            events._set_running_loop(self)
            while True:
                self._run_once()
                if self._stopping:
                    break
        finally:
            ...

这个函数不断的调用_run_once(),就像我们抽取出来的loop函数中不断地调用下面这段代码:

events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

而在 _run_once()中:

        else:
            event_list = self._selector.select(timeout)  # 筛选就绪事件,将其回调添加到self._ready中
        self._process_events(event_list)  # 该函数具体实现在selector_events.py中

这里也就是选出就绪事件,然后添加到self._ready队列中,随后执行。在_run_once()的尾部,我们看到如下代码:

ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

这里就是调用就绪事件的回调函数的执行。先看_ready队列中是否有待处理的Handle实例,如果有,那就一个一个执行,handle中的_run()方法就是执行就绪事件的回调函数。至此,就把我们抽取出来的中的loop()函数的逻辑对应到了asyncio源码的循环之中。

最后来看看Future

正如我们上面抽取的代码中的Future:

    def __iter__(self):
        yield self
        return self.result  # 在Task.step中send(result)的时候再次调用这个生成器,但是此时会抛出stopInteration异常,并且把self.result返回

yield的出现使得iter函数变成一个生成器,生成器本身就有next方法,所以不需要额外实现。yield from x语句首先调用iter(x)获取一个迭代器(生成器也是迭代器)。

这里的future和asyncio中的future,结构是一样的,功能也类似。最后运行起来的时候就是生成器一层嵌套一层。

总结

以上介绍了Python异步编程和自己简单实现一个asyncio。如果还有不太理解,或者是觉得文章在某些地方还有需要提升的地方,以及有不同观点的地方,欢迎在下面留言交流,帮助我一起把这篇文章变得更好。

欢迎转载,但请注明出处。