简单理解python异步编程与asyncio实现(三)

协程与asyncio

协程

协程(Co-routine),即是协作式的例程

它是非抢占式的多任务子例程的概括,可以允许有多个入口点在例程中确定的位置来控制程序的暂停与恢复执行。

例程是什么?编程语言定义的可被调用的代码段,为了完成某个特定功能而封装在一起的一系列指令。一般的编程语言都用称为函数或方法的代码结构来体现。

首先,要知道的是,无论是多进程,多线程还是协程,都是为了解决多任务同时进行的问题。而多任务系统实现的关键在于如何暂停当前任务,保存当前任务的上下文,选择下一个任务,恢复下一个任务的上下文 ,执行下一个任务。

对计算机的不同层次来说,上下文的含义也不一样。

进程的切换 需要 切换系统资源和指令,消耗时间最长。

线程的切换,不需要切换系统资源,只需要切换指令、线程堆栈。但这个过程也需要系统调用。

协程的切换都在用户空间内进行,不需要进行系统调用。

在Python中线程切换,是由 python 虚拟机控制,通过一个系统调用,来进行线程切换。协程的切换过程完全由程序自身控制。

协程优于线程的主要在于

Python中,协程有两种,一种无栈协程,python 中以 asyncio 为代表,一种有栈协程,python 中以 gevent 为代表。

yield 和 yield from

yield

目前最新的Python已经没有采用基于 yield 的协程了。但是为了更好的理解协程,先来一个简单的 yield 的小例子。

def fun_e():
    print('yield 1')
    yield 1
    print('yield 2')
    yield 2

gen = fun_e()
print('start')
a = gen.send(None)
print('生成器的第一个值', a)
b = gen.next(None)
print('生成器的第二个值', b)

上面代码的输出结果是

start
yield 1
生成器的第一个值 1
yield 2
生成器的第二个值 2

这里程序运行到第一个 yield 的时候,保存了函数的上下文之后便退出了,然后又通过 next 方法进入了这个函数,将刚刚保存的函数上下文恢复并继续运行。

一个协程程序的所有就是: 保存上下文 切换运行程序 恢复上下文 重新进入程序

Cpython中的上下文,被封装成了一个PyFrameObject的结构,也可以叫它栈帧。

源码:

typedef struct _frame {
    PyObject_VAR_HEAD
    struct _frame *f_back;      /* previous frame, or NULL */
    PyCodeObject *f_code;       /* code segment */
    PyObject *f_builtins;       /* builtin symbol table (PyDictObject) */
    PyObject *f_globals;        /* global symbol table (PyDictObject) */
    PyObject *f_locals;         /* local symbol table (any mapping) */
    PyObject **f_valuestack;    /* points after the last local */
    /* Next free slot in f_valuestack.  Frame creation sets to f_valuestack.
       Frame evaluation usually NULLs it, but a frame that yields sets it
       to the current stack top. */
    PyObject **f_stacktop;
    PyObject *f_trace;          /* Trace function */
    char f_trace_lines;         /* Emit per-line trace events? */
    char f_trace_opcodes;       /* Emit per-opcode trace events? */

    /* Borrowed reference to a generator, or NULL */
    /* 生成器的指针 */
    PyObject *f_gen;

    int f_lasti;                /* 上一个运行的字节码位置 */
    /* Call PyFrame_GetLineNumber() instead of reading this field
       directly.  As of 2.3 f_lineno is only valid when tracing is
       active (i.e. when f_trace is set).  At other times we use
       PyCode_Addr2Line to calculate the line from the current
       bytecode index. */
    int f_lineno;               /* 对应的Python源码行数 */
    int f_iblock;               /* index in f_blockstack */
    char f_executing;           /* whether the frame is still executing */
    PyTryBlock f_blockstack[CO_MAXBLOCKS]; /* for try and loop blocks */
    PyObject *f_localsplus[1];  /* locals+stack, dynamically sized */
} PyFrameObject;

在Python的实际执行中,会产生很多的PyFrameObject对象,然后这些对象都被链接起来,形成一条链表。

在Python中的生成器的结构体定义是一个宏,它指向一个PyFrameObject对象,表示这个生成器的上下文。

在生成器这个结构体中,有3个重要的东西:

  1. 指向生成器上下文的指针
  2. 一个指示生成器状态的字符串 未启动,停止,运行,结束
  3. 生成器的字节码

上下文 + 指令序列 + 状态

在生成器中,next 和 send 的作用相同,但是 send 可以传入一个参数。

yield from

在生成器中,可以使用return返回值,但如果 send 走到 return 语句的时候会报一个StopIteration。 return 返回值的 就在 exception 的 value 中。

如下例:

def test_fun1():
    yield 1
    return 2

gen = test_fun1()
try:
    gen.send(None)
    gen.send(None)
except StopIteration as e:
    print(e.value)

执行以上代码的输出结果是 2

yield from 有两重性质,一方面,它是一个表达式,表达式自然是有值的,他的值,就是yield from 后面生成器 return 的返回值。非常关键的一点,生成器的 yield 语句会向外产出值,但是 return 的值并不会向外产出。想要获得 return 的返回值,要么用 try 语句捕获异常要么用 yield from 表达式获取值。

可以看一下下面这例子

def test_fun1():
    yield 1
    return 2

def test_fun2():
    a = yield from test_fun1()
    print(f"yield from 表达式的值为 {a}")
    yield None

gen = test_fun2()
gen.send(None)
gen.send(None)

输出结果:

yield from 表达式的值为 2

yield from 还有一个特点就是可以将内层的生成器的返回值,传到外层。

就像下面这个例子:

def test_gen1():
    yield 1
    yield 2
    return 3

def test_gen2():
    a = yield from test_gen1()
    print(f"yield from {a}")

for i in test_gen2():
    print(i)

输出结果为:

1
2
yield from 3

内层生成器 test_gen1() 可以通过 yield from 在最外层将值取出来。

这样我们使用 yield from 可以将多个生成器连接起来。

简单理解 yield

一开始接触 yield 的时候很不好理解这个 yield的用法,不明白什么叫做生成器,什么参数传递。其实可以直接把 yield 先简单看成 return,程序执行到 yield 的时候就停止了。

先看一个简单的例子

def example():
    print("开始...")
    while True:
        res = yield 4
        print("res:",res)
g = example()
print(next(g))
print("*"*20)
print(next(g))

输出结果:

开始...
4
********************
res: None
4

上述代码的执行顺序为:

  1. 程序开始执行以后,因为foo函数中有yield关键字,所以foo函数并不会真的执行,而是先得到一个生成器g(相当于一个对象)
  2. 直到调用next方法,foo函数正式开始执行,先执行foo函数中的print方法,然后进入while循环
  3. 程序遇到yield关键字,然后把yield想想成return,return了一个4之后,程序停止,并没有执行赋值给res操作,此时next(g)语句执行完成,所以输出的前两行(第一个是while上面的print的结果,第二个是return出的结果)是执行print(next(g))的结果,
  4. 程序执行print(" " 20),输出20个*
  5. 又开始执行下面的print(next(g)),这个时候和上面那个差不多,不过不同的是,这个时候是从刚才那个next程序停止的地方开始执行的,也就是要执行res的赋值操作,这时候要注意,这个时候赋值操作的右边是没有值的(因为刚才那个是return出去了,并没有给赋值操作的左边传参数),所以这个时候res赋值是None,所以接着下面的输出就是res:None,
  6. 程序会继续在while里执行,又一次碰到yield,这个时候同样return 出4,然后程序停止,print函数输出的4就是这次return出的4

yield和return的关系和区别了,带yield的函数是一个生成器,而不是一个函数了,这个生成器有一个函数就是next函数,next就相当于“下一步”生成哪个数,这一次的next开始的地方是接着上一次的next停止的地方执行的,所以调用next的时候,生成器并不会从 example 函数的开始执行,只是接着上一步停止的地方开始,然后遇到yield后,return出要生成的数,此步就结束。

再来一个 send 的例子:

def foo():
    print("starting...")
    while True:
        res = yield 4
        print("res:",res)
g = foo()
print(next(g))
print("*"*20)
print(g.send(7))

输出结果

starting...
4
********************
res: 7
4

先大致说一下send函数的概念:此时你应该注意到上面那个的紫色的字,还有上面那个res的值为什么是None,这个变成了7,到底为什么,这是因为,send是发送一个参数给res的,因为上面讲到,return的时候,并没有把4赋值给res,下次执行的时候只好继续执行赋值操作,只好赋值为None了,而如果用send的话,开始执行的时候,先接着上一次(return 4之后)执行,先把7赋值给了res,然后执行next的作用,遇见下一回的yield,return出结果后结束。

接上之前的步骤:

  1. 程序执行g.send(7),程序会从yield关键字那一行继续向下运行,send会把7这个值赋值给res变量
  2. 由于send方法中包含next()方法,所以程序会继续向下运行执行print方法,然后再次进入while循环
  3. 程序执行再次遇到yield关键字,yield会返回后面的值后,程序再次暂停,直到再次调用next方法或send方法。

基于生成器的协程

前面说了这么多,在Python里面为什么要使用协程来解决异步的问题呢。我们先来看看前面讲到的“事件循环+回调”这种方式的问题。

“事件循环+回调”的问题

在单线程内使用前面爬虫例子中的异步编程,也确实能够大大提高程序运行效率。但是在生产项目中,要应对的复杂度会大大增加。考虑如下问题:

在实际编程中,上述问题不好避免,也确实存在这么些的缺陷。

如果是同步执行的程序,程序中的每一步都是线程的指令指针控制着的流程,而在回调版本中,流程就是编程的人需要注意和安排的。

代码风格难看是小事,但栈撕裂和状态管理困难这两个缺点会让基于回调的异步编程很艰难。

Python在事件循环+回调的基础上衍生出了基于协程的解决方案,代表作有 Tornado、Twisted、asyncio 等。

未来对象

Python 中有种特殊的对象——生成器(Generator),它的特点和协程很像。每一次迭代之间,会暂停执行,继续下一次迭代的时候还不会丢失先前的状态。

为了支持用生成器做简单的协程,Python 2.5 对生成器进行了增强(PEP 342),该增强提案的标题是 “Coroutines via Enhanced Generators”。有了PEP 342的加持,生成器可以通过yield 暂停执行和向外返回数据,也可以通过send()向生成器内发送数据,还可以通过throw()向生成器内抛出异常以便随时终止生成器的运行。

这里我们不用回调的方式了,怎么知道异步调用的结果呢?先设计一个对象,异步调用执行完的时候,就把结果放在它里面。这种对象称之为未来对象。

class Future:
    """
    未来对象
    异步调用执行完的时候,就把结果放在它里面。
    """
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

未来对象有一个result属性,用于存放未来的执行结果。还有个set_result()方法,是用于设置result的,并且会在给result绑定值以后运行事先给future添加的回调。回调是通过未来对象的add_done_callback()方法添加的。

虽然这个地方还是有 callback,但是这个 callback 和之前的不太一样。

重构Crawler爬虫

因为有了未来对象,我们先用Future来重构一下爬虫。

class Crawler:
    def __init__(self, url):
        self.url = url
        self.response = b''

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('example.com', 80))
        except BlockingIOError:
            pass
        f = Future()

        def on_connect():
            f.set_result(None)

        selector.register(sock.fileno(), EVENT_WRITE, on_connect)
        yield f
        selector.unregister(sock.fileno())
        get = 'GET {0} HTTP/1.0 \r\nHost: example.com\r\n\r\n'.format(self.url)
        sock.send(get.encode('ascii'))

        global stopped
        while True:
            f = Future()

            def on_readable():
                f.set_result(sock.recv(4096))

            selector.register(sock.fileno(), EVENT_READ, on_readable)
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                urls_todo.remove(self.url)
                if not urls_todo:
                    stopped = True
                    break

这里的 fetch方法内有了yield表达式,所以它成为生成器。生成器需要先调用next()迭代一次或者是先send(None)启动,遇到yield之后便暂停。那这fetch生成器如何再次恢复执行呢?但是目前还有生成器的启动代码。这里我们需要添加一个任务对象(Task)来启动它。

任务对象(Task)

遵循一个编程规则:单一职责,每种角色各司其职。目前还没有一个角色来负责生成器的执行和管理生成器的状态,那么我们就创建一个。

class Task:
    """任务对象"""
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            # send放到coro执行,即fetch,直到下次yield
            # next_future为yield返回对象
            next_future = self.coro.send(future.result)
        except StopIteration:
            return 
        next_future.add_done_callback(self.step)

上述代码中Task封装了coro对象,即初始化时传递给他的对象,被管理的任务是待执行的协程,故而这里的coro就是fetch()生成器。它还有个step()方法,在初始化的时候就会执行一遍。step()内会调用生成器的send()方法,初始化第一次发送的是None就驱动了coro即fetch()的第一次执行。

send()完成之后,得到下一次的future,然后给下一次的future添加step()回调。add_done_callback()其实不是给写爬虫业务逻辑用的。

再看一下fetch()生成器,其内部写完了所有的业务逻辑,包括如何发送请求,如何读取响应。而且注册给 selector 的回调相当简单,就是给对应的 future 对象绑定结果值。两个 yield 表达式都是返回对应的 future 对象,然后返回 Task.step() 之内,这样 Task, Future, Coroutine三者就串联在了一起。

初始化Task对象以后,把fetch()给驱动到了第 yied f 就完事了,接下来应该怎么继续。

事件循环(Event Loop)驱动协程运行

接下来,只需等待已经注册的EVENT_WRITE事件发生。事件循环就像心脏一般,只要它开始跳动,整个程序就会持续运行。

def loop():
    """事件循环驱动协程"""
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

整个重构之后的爬虫

完整的程序如下:

import socket
from selectors import EVENT_READ, EVENT_WRITE, DefaultSelector
from socket import create_connection

selector = DefaultSelector()
stopped = False
urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}

class Future:
    """
    未来对象
    异步调用执行完的时候,就把结果放在它里面。
    """
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

class Crawler:
    def __init__(self, url):
        self.url = url
        self.response = b''

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('example.com', 80))
        except BlockingIOError:
            pass
        f = Future()

        def on_connect():
            f.set_result(None)

        selector.register(sock.fileno(), EVENT_WRITE, on_connect)
        yield f
        selector.unregister(sock.fileno())
        get = 'GET {0} HTTP/1.0 \r\nHost: example.com\r\n\r\n'.format(self.url)
        sock.send(get.encode('ascii'))

        global stopped
        while True:
            f = Future()

            def on_readable():
                f.set_result(sock.recv(4096))

            selector.register(sock.fileno(), EVENT_READ, on_readable)
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                urls_todo.remove(self.url)
                if not urls_todo:
                    stopped = True
                    break

class Task:
    """任务对象"""
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            # send放到coro执行,即fetch,直到下次yield
            # next_future为yield返回对象
            next_future = self.coro.send(future.result)
        except StopIteration:
            return 
        next_future.add_done_callback(self.step)

def loop():
    """事件循环驱动协程"""
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

if __name__ == "__main__":
    import time
    start = time.time()
    for url in urls_todo:
        crawler = Crawler(url)
        Task(crawler.fetch())
    loop()
    print(time.time() - start)

"""
现在loop有了些许变化,callback()不再传递event_key和event_mask参数。也就是说,
这里的回调根本不关心是谁触发了这个事件,
结合fetch()可以知道,它只需完成对future设置结果值即可f.set_result()。
"""

生成器风格和回调风格总结

在回调风格中:

而基于生成器协程的风格:

yield from 改进生成器协程

如果说fetch的容错能力要更强,业务功能也需要更完善,怎么办?而且技术处理的部分(socket相关的)和业务处理的部分(请求与返回数据的处理)混在一起。

但是这些关键节点的地方都有yield,抽离出来的代码也需要是生成器。而且fetch()自己也得是生成器。生成器里捣鼓生成器,好像有些麻烦。

好在有 yield from 来解决这个问题。

yield from 语法

yield from 是Python 3.3 新引入的语法(PEP 380)。它主要解决的就是在生成器里弄生成器不方便的问题。它有两大主要功能。

第一个功能是:让嵌套生成器不必通过循环迭代yield,而是直接yield from。以下两种方式是等价的。

def gen_one():
    subgen = range(10)  
    yield from subgen

def gen_two():
    subgen = range(10)  
    for item in subgen:    
        yield item

第二个功能就是在子生成器和原生成器的调用者之间打开双向通道,两者可以直接通信。

def gen():
    yield from subgen()def subgen():
    while True:
        x = yield
        yield x+1def main():
    g = gen()

    next(g)                # 驱动生成器g开始执行到第一个 yield
    retval = g.send(1)     # 看似向生成器 gen() 发送数据
    print(retval)          # 返回2
    g.throw(StopIteration) # 看似向gen()抛入异常

通过上述代码清晰地理解了yield from的双向通道功能。关键字yield from在gen()内部为subgen()和main()开辟了通信通道。main()里可以直接将数据1发送给subgen(),subgen()也可以将计算后的数据2返回到main()里,main()里也可以直接向subgen()抛入异常以终止subgen()。

重构代码

首先我们需要将 Future 对象变成一个 iter 对象:

class Future:
    """
    未来对象
    异步调用执行完的时候,就把结果放在它里面。
    """
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

    def __iter__(self):
        # 将Future变成一个iter对象
        yield self
        return self.result

之后

抽象socket连接的功能:

def connect(sock, address):
    f = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
    selector.unregister(sock.fileno())

抽象单次recv()和读取完整的response功能

def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield from f
    selector.unregister(sock.fileno())
    return chunk

def read_all(sock):
    response = []
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)
    return b''.join(response)

现在重构Crawler类

class Crawler:

    def __init__(self, url):
        self.url = url
        self.response = b''

    def fetch(self):
        global stopped
        sock = socket.socket()
        yield from connect(sock, ('example.com', 80))
        get = f'GET {self.url} HTTP/1.0\r\nHost: example.com\r\n\r\n'
        sock.send(get.encode('ascii'))
        self.response = yield from read_all(sock)
        urls_todo.remove(self.url)
        if not urls_todo:
            stopped = True

在Python 3.3 引入yield from新语法之后,就不再推荐用yield去做协程。全都使用yield from由于其双向通道的功能,可以让我们在协程间随心所欲地传递数据。

有了 yield from 这个工具,我们便可以将多个生成器串联起来。yield from 的意义在于,将这些生成器串联起来形成一颗树,并且提供了一种便捷的方法,将这颗树的叶子节点依次返回。yield from 将多个生成器连接起来的方式,我们可以使用很简单的方式就可以将所有的 yield 返回值一一提取出来。不断的对根节点的生成器 进行send 操作即可。

上面的例子介绍了 yield 和 yield from,但是它们和 asyncio 之间有什么区别和联系呢,来看一下 asyncio。