异步(asynchronous)编程模式是相对于同步方式的另一种编程思路。 C10K的问题提出后, 各种编程语言都出现了解决高并发的技术栈, 而早在Python2时期, Twisted、Tornado和Gevent这三个库用不同的技术路径解决了高并发。[1] 其中就有用到Python的一些异步实现方法。 此文将对异步编程的基本思想和Python的实现方式进行阐述。
[目录]
这一概念是针对主程序来说的, 如果主程序遇到阻塞的任务时选择等待,那么这种行为就是同步的, 如果不等待选择执行其他的任务,就是异步的;
这一概念是针对任务来说的, 如果某项任务在执行费时IO操作时不能挂起跳出(不能把执行权归还主程序)那么此项任务就是阻塞的, 相反就是非阻塞的。
其实不必将上面四个概念区分的很清楚,只需明白异步非阻塞编程的基本思路是将IO密集型的任务时间节省出来让CPU尽可能多地去完成计算密集型任务;
Python最早是用生成器写的协程,3.4后可以用asyncio.coroutine装饰一个协程,3.5加入了新的关键字async\await协程成为新的语法。3.6还引入了异步生成器。
协程的定义由很多,我比较认同可以挂起中断的函数 这一说法,前面概念中讲到非阻塞的任务是可以挂起中断的,那么协程正可以实现此项任务;
协程包含两种情况:
协程的运作方式:
协程的典型应用案例,摘自《Fluent Python》
# 计算移动平均值
from collections import namedtuple
Result = namedtuple('Result', 'count average')
# 子生成器
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total/count
return Result(count, average)
# 委派生成器
def grouper(results, key):
while True:
results[key] = yield from averager()
# 客户端代码,即调用方
def main(data):
results = {}
for key, values in data.items():
group = grouper(results, key)
# 协程需要激活
next(group)
for value in values:
# 传递值给自生成器
group.send(value)
group.send(None) # 重要!
report(results)
# 输出报告
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))
data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
if __name__ == '__main__':
main(data)
调用协程函数并不能使该协程运行。调用协程函数所返回的协程对象,在被你安排执行之前,不会做任何事情。有两种方式可以启动它:[2]
其实解决高并发就是实现的程序的异步和非阻塞,而多线程\多进程是大多数比较熟悉的方案,将阻塞任务挂起到多个线程, 由操作系统实现线程\进程间的调度,concurrent.futures对象的出现使得多线程的非阻塞任务可以使用回调函数, 参考下例:
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
from concurrent.futures import Future
from multiprocessing import Pool
from functools import partial
import time
def get_html(times):
time.sleep(times)
print("get page {} success".format(times))
return times
executor = ThreadPoolExecutor(max_workers=2)
# 回调函数
def done_callback(msg, future):
print(msg + 'this is called by callback!', future.result())
urls = [3,2,4]
all_task = [executor.submit(get_html, (url)).add_done_callback(partial(done_callback, 'XXX')) for url in urls]
wait(all_task, return_when=FIRST_COMPLETED)
executor.submit返回一个future对象,这个对象可以存储结果、取消、添加回调等等。熟悉前端的朋友们肯定想起来这与javascipt的promise对象的思想一致;
还有一种常见的写法是(使用进程与线程池,减少频繁创建线程):
NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'
def fetch(a):
r = requests.get(URL.format(a))
return r.json()['args']['a']
def requests_threadpool():
start = time.time()
# 自动等待完成
with ThreadPoolExecutor(max_workers=3) as executor:
for num, result in zip(NUMBERS, executor.map(fetch, NUMBERS)):
print('fetch({}) = {}'.format(num, result))
多进程ProcessPoolExecutor的接口与线程基本相同;他们实现异步非阻塞的问题是:
现在希望用单线程的方式实现异步和非阻塞,Python的协程实现了非阻塞任务,现在只需要一个任务循环机制就可以把任务的调度交由程序本身来完成,不经过操作系统调度可省下不少的时间。
Python3.4正式加入的asyncio(读音请自行谷歌) 为单线程实现异步提供了任务循环机制,同时为协程提供了一些包装接口,使得协程不许要进过send\next等繁琐操作。同时引入了 可等待对象(Awaitable) 的概念;
可等待对象要实现的协议有__aiter__, anext,aenter,aexit;asyncio接口可以直接用的可等待对象是:
future对象对协程包装成结果容器,使其可以像多线程中的future一样可以使用回调等功能;Task是future的子类,用于同时启动多个协程\future
对于这三个对象不理解的可以直接看官方文档中的阐述,其实大部分情况下不必使用底层的future对象;
几个asyncio常用的接口:
上面的接口都是3.7出现的,之前的写法中还会使用带队列loop, 在3.7中致力于取消loop的写法,使得用户更容易上手;
再次回顾一下asyncio最主要解决的问题:为可等待对象提供了调度多列,使得用户可以像写同步代码一样实现异步非阻塞操作,回避了写回调函数;
协程中不能写阻塞的代码,所以就不能使用阻塞的任务和库,如requests, pymysql等, 非得使用也可以,异步+多线程的方式仍然有效,而且asyncio也跟多进程一样实现了他自己的同步机制,一定程度上保证线程安全;但是我们都希望所有的任务都可以使用异步完成,目前,aio生态圈正在逐步完善,已经有如下项目:
Libraries to build web applications.
Libraries to implement applications using message queues.
Libraries to connect to databases.
Libraries to communicate in your network.
Libraries to test asyncio based applications.
Alternative asyncio loop implementations.
Other awesome asyncio libraries.
run()
function that handles all the usual boilerplate for startup and graceful shutdown.Documentation, blog posts, and other awesome writing about asyncio.
Recordings of awesome talks about asyncio.
Topics of Interest (Python Asyncio) | screencast | slides - PyCon Brasil 2015 keynote (David Beazley). |
Python Asynchronous I/O Walkthrough | blog post - 8-part code walkthrough (Philip Guo). |
对着asyncio的文档示例多理解几个程序后, 我们就可以写几个IO密集操作的程序来测试与喜爱各种方案的效率了,我编写了一个简单的爬虫,没有写储存的IO,只测试请求的IO。分别用了以下几个方案:
还有一个国内开发者基于asyncio和aiohttp写的一个scrapy-like的爬虫框架ruia, 速度也不错;
完整的代码在这里
可以看出在requests库+多线程上面套用异步是无效的,而aiohttp的多线程并没有效率上的明显提升。