同步->异步I/O
以一个爬虫为例,下载10篇网页,用几个例子来展示从同步->异步。
同步阻塞方式
以同步阻塞方式来写这个程序也是最容易想到的方式,即依次下载好10篇网页。
import socket
def blocking_way():
sock = socket.socket()
# 阻塞
sock.connect(('example.com', 80))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
chunk = sock.rev(4096)
while chunk:
response += chunk
# 阻塞
chunk = sock.rev(4096)
return response
def sync_way():
res = []
for i in range(10):
res.append(blocking_way())
return len(res)
这段代码的执行事件大概为4.5秒。(取多次平均值)
上述代码中, blocking_way()这个函数的作用主要是建立连接,发送HTTP请求,然后从socket读取HTTP响应请求到并返回数据。
sync_way()将blocking_way()执行了10次,也就是说,我们执行了10次访问下载 example.com
由于网络情况和服务端的处理各不相同,所以服务端什么时候返回了响应数据并被客户端接收到可供程序读取,也是不可预测的。所以 sock.connect() 和 sock.recv() 这两个调用在默认情况下是阻塞的。
注:sock.send()函数并不会阻塞太久,它只负责将请求数据拷贝到TCP/IP协议栈的系统缓冲区中就返回,并不等待服务端返回的应答确认。
如果是说网络环境很差的话,创建网络连接的TCP/IP握手需要1秒,那么 sock.connect() 就得阻塞1秒。这一秒时间对CPU来说就被浪费了。同理,sock.recv() 也一样的必须得等到服务端的响应数据已经被客户端接收,才能进行后续的程序。目前的例子上只有只需要下载一篇网页,阻塞10次看起来好像没有什么问题,可是如果需求是1000w篇的话,这种阻塞的方式就显得很蠢,效率也很低下。
改进:多进程
在一个程序中,依次执行10次好像有些耗时,那么我们使用多进程,开10个同样的程序一起处理的话,也许会好一些?于是第一个改进方式便出来了: 多进程编程 。发展脉络也是如此。在更早的操作系统(Linux 2.4)及其以前,进程是 OS 调度任务的实体,是面向进程设计的OS。
import socket
from concurrent import futures
def blocking_way():
sock =socket.socket()
# 阻塞
sock.connect(('example.com', 80))
request = 'GET /HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('acsii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
return response
def process_way():
workers = 10
with futures.ProcessPoolExecutor(workers) as executor:
futs = {executor.sumbit(blocking_way) for i in range(10)}
return len([fut.result() for fut in futs])
这段代码执行时间大概为0.6秒。
按理说,使用10个相同的进程来执行这段程序,其执行时间应该是会缩短到原来的1/10,然而并没有。这里面还有一些时间被进程的切换所消耗掉了。
CPU从一个进程切换到另一个进程的时候,需要把旧进程运行时的寄存器状态,内存状态都保存好,然后再将另一个进程之前保存的数据恢复。当进程数量大于CPU核心数的时候,进程切换是必须的。
一般来说,服务器在能够稳定运行的前提下,可以同时处理的进程数在数十个到数百个规模。如果进程数量规模更大,系统运行将不稳定,而且可用内存资源往往也会不足。除了切换开销大,以及可支持的任务规模小之外,多进程还有其他缺点,如状态共享等问题。
改进:多线程
线程的数据结构比进程更加的轻量级,同一个进程可以容纳好几个线程。
后来的OS也把调度单位由进程转为线程,进程只作为线程的容器,用于管理进程所需的资源。而且OS级别的线程是可以被分配到不同的CPU核心同时运行的。
import socket
from concurrent import futures
def blocking_way():
sock = socket.socket()
# 阻塞
sock.connect(('example.com', 80))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('acsii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
# 阻塞
chunk = sock.recv(4096)
return response
def thread_way():
wokers = 10
with futures.ThreadPoolExecutor(workers) as executor:
futs = {executor.sumbit(blocking_way) for i in range(10)}
return len([fut.result(fut.result() for fut in futs)])
总运行时间大概为0.43秒。
从运行时间上来看,多线程好像已经解决了进程切换开销大的问题,而且可支持的任务数量规模,也变成了数百个到数千个。
但是由于CPython中的多线程因为GIL的存在,它们并不能利用CPU多核优势,一个Python进程中,只允许有一个线程处于运行状态。
在做阻塞的系统调用时,例如sock.connect(),sock.recv()时,当前线程会释放GIL,让别的线程有执行机会。但是单个线程内,在阻塞调用上还是阻塞的。
Python中 time.sleep 是阻塞的,都知道使用它要谨慎,但在多线程编程中,time.sleep 并不会阻塞其他线程。
除了GIL之外,所有的多线程还有通病。它们是被OS调度,调度策略是抢占式的,以保证同等优先级的线程都有均等的执行机会,那带来的问题是:并不知道下一时刻是哪个线程被运行,也不知道它正要执行的代码是什么。所以就可能存在竞态条件。如果在一个复杂的爬虫系统中,要抓取的URL由多个爬虫线程来拿,那么URL如何分配,这就需要用到“锁”或“同步队列”来保证下载任务不会被重复执行。多线程最主要的问题还是竞态条件。
非阻塞方式
千呼万唤使出来,下例是最原始的非阻塞。
import socket
def noblock_way():
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('example.com', 80))
except BlockingIOError:
# 非阻塞过程也会抛出异常
pass
request = 'GET / HTTP /1.0\r\nHost: example.com\r\n\r\n'
data = request.encode('ascii')
# 不断重复尝试发送
while True:
try:
sock.send(data)
# send不出现异常,停止
break
except OSError:
pass
response = b''
while True:
try:
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
break
except OSError:
pass
return response
def sync_way():
res = []
for i in range(10):
res.append(noblock_way())
return len(res)
程序总耗时约4.3秒。
执行完这段代码的时候,感觉好像是被骗了,代码的执行时间和非阻塞方式差不多,而且程序更复杂了。要非阻塞何用?
代码sock.setblocking(False)告诉OS,让socket上阻塞调用都改为非阻塞的方式。非阻塞就是在做一件事的时候,不阻碍调用它的程序做别的事情。上述代码在执行完 sock.connect() 和 sock.recv() 后的确不再阻塞,可以继续往下执行请求准备的代码或者是执行下一次读取。第8行要放在try语句内,是因为socket在发送非阻塞连接请求过程中,系统底层也会抛出异常。connect()被调用之后,立即可以往下执行第12和13行的代码。
虽然 connect() 和 recv() 不再阻塞主程序,空出来的时间段CPU没有空闲着,但并没有利用好这空闲去做其他有意义的事情,而是在循环尝试读写 socket (不停判断非阻塞调用的状态是否就绪)。还得处理来自底层的可忽略的异常。也不能同时处理多个 socket。 所以总体执行时间和同步阻塞相当。
非阻塞改进
epoll
其实判断非阻塞调用是否就绪可以交给OS来做,不用应用程序自己去等待和判断,可以用这个空闲时间去做其他的事情。
OS将O/I的变化都封装成了事件,比如可读事件、可写事件。而且提供了相应的系统模块以供调用来接收事件通知。这个模块就是select,让应用程序可以通过select注册文件描述符和回调函数。当文件描述符的状态发生变化时,select 就调用事先注册的回调函数。
select因其算法效率比较低,后来改进成了poll,再后来又有进一步改进,BSD内核改进成了kqueue模块,而Linux内核改进成了epoll模块。这四个模块的作用都相同,暴露给程序员使用的API也几乎一致,区别在于kqueue 和 epoll 在处理大量文件描述符时效率更高。一般的Linux服务器是使用的 epoll。
回调(callback)
将I/O事件的监听交给OS来处理,那么OS在知道I/O状态发生改变之后应该如何处理呢,这里一般都是通过回调的方式。
把发送数据和读取数据封装成独立的函数,用epoll代替应用程序监听socket状态,而且需要告知epoll “如果socket状态变为可以往里写数据(连接建立成功了),请调用HTTP请求发送函数。如果socket 变为可以读数据了(客户端已收到响应),请调用响应处理函数。”
import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
# selectors模块是对底层select/poll/epoll/kqueue的封装
# DefaultSelector类会根据 OS 环境自动选择最佳的模块
"""
创建Crawler 实例;
调用fetch方法,会创建socket连接和在selector上注册可写事件;
fetch内并无阻塞操作,该方法立即返回;
重复上述3个步骤,将10个不同的下载任务都加入事件循环;
启动事件循环,进入第1轮循环,阻塞在事件监听上;
当某个下载任务EVENT_WRITE被触发,回调其connected方法,第一轮事件循环结束;
进入第2轮事件循环,当某个下载任务有事件触发,执行其回调函数;此时已经不能推测是哪个事件发生,因为有可能是上次connected里的EVENT_READ先被触发,也可能是其他某个任务的EVENT_WRITE被触发;(此时,原来在一个下载任务上会阻塞的那段时间被利用起来执行另一个下载任务了)
循环往复,直至所有下载任务被处理完成
退出事件循环,结束整个下载程序
"""
selector = DefaultSelector()
stopped = False
urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
class Crawler:
"""
如果用这种方法抓去,需要创建10个Crawler实例,这样就会有20个事件发生
"""
def __init__(self, url):
self.url = url
self.sock = None
self.response = b''
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('example.com', 80))
except BlockingIOError:
pass
selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
def connected(self, key, mask):
selector.unregister(key.fd)
get = 'GET {0} HTTP/1.0\r\nHost: example.com\r\n\r\n'.format(self.url)
self.sock.send(get.encode('ascii'))
selector.register(key.fd, EVENT_READ, self.read_response)
def read_response(self, key, mask):
global stopped
# 如果响应大于4kb,下次循环继续
chunk = self.sock.recv(4096)
if chunk:
self.response += chunk
else:
selector.unregister(key.fd)
urls_todo.remove(self.url)
if not urls_todo:
stopped =True
def loop():
while not stopped:
# 阻塞,直到一个事件发生
events = selector.select() # 这是一个阻塞调用
for event_key, event_mask in events:
callback = event_key.data
callback(event_key, event_mask)
if __name__ == "__main__":
import time
start = time.time()
for url in urls_todo:
crawler = Crawler(url)
crawler.fetch()
loop()
print(time.time() - start)
总体耗时约0.45秒。
与之前函数不太一眼的地方是,我们将下载10个不同的URL界面,然后将URL的相对路径存储在 urls_todo 中。具体的改进如下。
首先是不断尝试 send() 和 recv() 这两个循环被取消掉了。
其次,导入了selectors模块,并创建了一个DefaultSelector 实例。Python标准库提供的selectors模块是对底层select/poll/epoll/kqueue的封装。DefaultSelector类会根据 OS 环境自动选择最佳的模块,那在 Linux 2.5.44 及更新的版本上都是epoll了。
然后分别注册了socket可写事件(EVENT_WRITE)以及可读事件(EVENT_READ)发生后应该采取的回调函数。
但是这里有一个问题,我们如何才能知道这10个Crawler实例创建的20个事件,哪个是当前正在发生的事件,从selector中拿出来,并且得到对应的回调函数去执行呢?
事件循环
所以我们在代码结尾加入了事件循环,写一个函数,循环地去访问selector模块,等待它告诉我们当前是哪个事件发生了,对应的应该是哪个回调函数。
在 loop() 这个事件循环的函数中,采用了stopped全局变量来控制事件循环的停止,当urls_todo消耗完毕之后,会标记stopped为True。
在事件循环里面有一个阻塞调用,selector.select() 。如果事件不发生,那么应用程序就没事件可处理,所以就干脆阻塞在这里等待事件发生。那可以推断,如果只下载一篇网页,一定要connect()之后才能send()继而recv(),那它的效率和阻塞的方式是一样的。因为不在connect()/recv()上阻塞,也得在select()上阻塞。
所以,selector机制(后文以此称呼代指epoll/kqueue)是设计用来解决大量并发连接的。当系统中有大量非阻塞调用,能随时产生事件的时候,selector机制才能发挥最大的威力。
在单线程内用 事件循环+回调 搞定了10篇网页同时下载的问题。这,已经是异步编程了。虽然有一个for 循环顺序地创建Crawler 实例并调用 fetch 方法,但是fetch 内仅有connect()和注册可写事件,而且从执行时间明显可以推断,多个下载任务确实在同时进行!
上述代码异步执行的过程:
- 创建Crawler 实例;
- 调用fetch方法,会创建socket连接和在selector上注册可写事件;
- fetch内并无阻塞操作,该方法立即返回;
- 重复上述3个步骤,将10个不同的下载任务都加入事件循环;
- 启动事件循环,进入第1轮循环,阻塞在事件监听上;
- 当某个下载任务EVENT_WRITE被触发,回调其connected方法,第一轮事件循环结束;
- 进入第2轮事件循环,当某个下载任务有事件触发,执行其回调函数;此时已经不能推测是哪个事件发生,因为有可能是上次connected里的EVENT_READ先被触发,也可能是其他某个任务的EVENT_WRITE被触发;(此时,原来在一个下载任务上会阻塞的那段时间被利用起来执行另一个下载任务了)
- 循环往复,直至所有下载任务被处理完成
- 退出事件循环,结束整个下载程序
做异步编程,上述的“事件循环+回调”这种模式是逃不掉的,尽管它可能用的不是epoll,也可能不是while循环。
但是在某些异步编程中并没有看到 CallBack 模式呢?比如Python的异步编程中,其主角是协程。