无穷的妖精乡并发!
面向刚学会requests.get
的萌新:如果只是为了性能,请别学scrapy
了。本驱逐舰的最高速度极易使网站瘫痪。请认真评估您的操作对网站造成的压力。使用本舰的一切后果由您负责。
虽然用起来很简单,背后的东西可能没那么简单。入门之后还是需要读readme。最好也看看gevent
和asyncio
低层API的文档吧。
如果您是大佬,建议直接无视本文,或者只看最小核心代码。
请不要吝惜您的star! 在代码仓库首页找到star按钮,点一下即可。需要GitHub账号!
由于学习源代码是不可避免的,因此先把最小化的核心代码放在这里吧。稍后会解释这些代码。
# github.com/Hecate2/Ignareo
# fixed IgnaleoG config
import asyncio,gevent
from gevent import monkey
monkey.patch_all() # gevent将能够并发执行I/O任务,且取代asyncio的事件循环
# 注意:monkey.patch_all()后程序很难调试。很可能调试就会报错!
import tornado.ioloop
import tornado.web
from tornado.platform.asyncio import AsyncIOMainLoop
portList=tuple([i for i in range(55568,55569)])
worker_loop=asyncio.get_event_loop()
# end fixed IgnaleoG config
class MainHandler(tornado.web.RequestHandler):
def get(self):
# 服务器每收到一次GET就会执行这里的代码
pass # 可以gevent.spawn
def post(self):
# 服务器每收到一次POST就会执行这里的代码
pass # 可以gevent.spawn
# gevent.spawn(随便什么函数的名字, 参数, 参数……)
def run_proc(port):
AsyncIOMainLoop().install() # 让tornado使用asyncio的事件循环
# app = tornado.web.Application([
# (r'/', MainHandler),
# ])
# app.listen(port)
# 可以gevent.spawn
# 写在这里的代码仅在Ignareo启动时运行一次
worker_loop.run_forever()
# 让asyncio的事件循环永远运行下去!毕竟这是个永远运行的服务器
# 这之后写的任何代码都不会被执行
if __name__ == '__main__':
from multiprocessing import Process
length=len(portList)
for port in range(length-1): # 启动多个内容一致的进程,每进程监听一个端口
p=Process(target=run_proc, args=(portList[port],))
p.start()
run_proc(portList[length-1])
我们先了解一下异步编程中的“事件循环”(概念取自asyncio
)。当你的程序执行输入输出(I/O)任务(例如requests.get
)时,CPU通常是在等待外部硬件(比如网卡或者远在地球另一端的某服务器),于是CPU自己是无事可做的。如果这些无事可做的时间拿来处理其他的I/O任务,是否可以实现大量的I/O并发呢?因此人类搞出了事件循环来利用起CPU的空闲时间。你可以向事件循环注册I/O任务,然后不需要管它! 当这项I/O任务完成后,事件循环将通知CPU处理任务结果。
于是你可以把事件循环驱动的程序视为两部分:纸面上你写的代码,以及背后的事件循环。你无需关心事件循环如何运行。只要把你纸面代码中含I/O的程序注册给事件循环,就可以等待结果。事件循环可以处理很多任务,并把结果交回给等待它的上下文。
举个例子:
s = requests.Session()
tasks = [gevent.spawn(s.get, url, ...) for _ in range(10)] # 发起10个get
results = gevent.joinall(tasks) # 等待所有任务完成,并获取所有任务的结果
这里gevent.spawn(s.get, url, ...)
就是向gevent
的事件循环注册了一个get任务。使用gevent.spawn
注册任务后,我们不需要等待所有get任务完成,而是立刻拿到一把用于取结果的"钥匙"(tasks里的每一项都是一把钥匙)。随后我们gevent.joinall(tasks)
,拿着tasks这一大批钥匙去坐等任务完成。
然而对于流程很长的I/O任务,如果每次都这样显式地等待结果,代码可能会非常冗长。我们可以直接用gevent.spawn
注册一个函数。假设我们有一个抢购任务,货物会在任意某个时刻上架,而你需要不停地用大量session监控货物上架情况,一旦上架尽快下单:
def monitor_and_order(session):
while 1:
response = session.get(monitored_url)
if good_available(response.text): # 解析html判断有没有货
post_data = parse_post_data(session, response.text) # 从响应结果提取post参数
return order(session, post_data) # 下订单
# else # 实际上不需要else。继续循环get被监控的网页即可
tasks = [gevent.spawn(monitor_and_order, requests.Session()) for _ in range(5)]
gevent.joinall(tasks)
使用上面的代码,相当于我们一次设置了5个“人”,每个人一直不停地刷网页,看到有货就下单。
一个稍微复杂而且实际一点的例子可见于本代码仓库的User_training/purchasing
目录。
结论是:我们可以往事件循环里注册任意复杂的函数。 这些函数的流程可以很长很复杂。只要函数内部主要的时间消耗在I/O而不是具体计算上,事件循环应当能为我们同时处理很多I/O任务。
仅仅这一点就比scrapy
之类的框架舒服很多了。你可以任意地用requests
写一个漫长的爬虫流程,然后无缝地迁移到gevent
里。当然,很多websocket
或者数据库I/O之类的任务,原则上都是可以交给gevent
的。
到此为止的内容是百度一下就能学会的。然而当你纸面上的程序在joinall
坐等的过程中,背后的事件循环在做什么呢?
——当然是在坐等外部的硬件(网卡或远方的服务器)响应。
然而应当注意到,当你纸面上的代码等待某些任务完成时,其他未被你等待的任务也在运行。你的程序可能有多处在等待不同任务完成,而事件循环会将完成的任务的结果送回到正在等待它的上下文。你的任何一处等待都不会让事件循环驱动的任务停止运行。
Ignareo的特殊之处在于,你可以在事件循环运行的过程中,继续注册新的I/O任务。
一般而言,事件循环在运行的过程中,你只能对着一个命令行窗口发呆,无法向程序内部人工输入新的指令(使用Ctrl+C
这类中止命令不算……)。不过,你可以要求事件循环监听某些类型的输入,然后通知CPU来处理这些输入。一种轻松的做法当然是监听外部传入的HTTP请求。
Ignareo可以包含一个HTTP服务器。这个服务器将监听你从外部发来的请求。根据你在服务器代码里定义的处理请求的方式,我们可以在收到外部请求后向事件循环注册新任务。这样你可以在任意时刻发起新的I/O任务!
这里我选了tornado
实现HTTP服务器,因为它性能不错,兼容性好(最小核心代码中包含一些奇妙的操作让tornado
运行在asyncio
或gevent
下),配置简单,安全性较好。
虽然我几乎没有造任何官方轮子,但Ignareo可以自由调用海量现成工具。
IgnareoG使用gevent
推动;IgnareoA使用asyncio
推动。只要是能被事件循环推动的库和函数,都可以无缝无阻塞地加入Ignareo,因此你可以使用的工具远远比scrapy
多。请不要嫌我造的官方轮子太少哦~
Ignareo本身就是个服务器,当然可以像其他web服务一样任意部署成千上万个节点!我应该不会有时间提供自己造的分布式部署管理工具,所以祝您愉快地使用现成轮子,包括但不限于docker(-compose)
之类的容器工具和各类微服务治理工具。
实际业务中,任务参数可能不止来源于一处。你可能会在Ignareo内部构建队列来发起需要多种不同参数的I/O任务。我的建议是,如果队列的组件非常复杂,可以把队列切面独立为一个http服务,架在任务信息源与Ignareo之间,保持Ignareo的无状态性。
在IgnareoG最小核心代码中合适的位置 填入:
task = gevent.spawn(函数名, 参数, 参数, ...)
# 注意,不是gevent.spawn(函数名(参数, 参数, ...))
就可以让你的task
运行,并且你的task
对其他I/O任务几乎没有性能影响。如果你想要task
的返回值:
result = gevent.joinall([task])
一切就是这么简单。