在Python中使用Asyncio系统(3-6)优雅地开启和关闭协程
off999 2025-04-27 15:36 84 浏览 0 评论
优雅地开启和关闭协程
大多数基于异步的程序都是基于网络的并且长期运行的应用程序。这个领域在处理如何启动和关闭的过程中有惊人的复杂性。
在这两个操作中,启动一般比较简单。启动异步应用程序的标准方式是有一个main()协程函数,并用asyncio.run()调用它,就像本章开头的示例3-2所演示的那样。
一般来说,启动都会相当的简单直接;比如前面描述的服务器案例,你可能在文档中不止一次的阅读到它。我们待会儿要在后面的章节简要通过代码来介绍一个服务器启动的演示。
关闭程序一般更复杂一些。为了关闭,我之前提到了在asyncio.run()中的准备步骤。当async def main()函数退出时,要采取以下操作:
- 收集所有仍在排队的任务对象(如果有的话)。
- 取消这些任务(这个步骤会在每个运行的协程中抛出CancelledError异常,就是你在协程函数的代码里使用try/except来处理的那个异常)。
- 把所有这些任务放到一个组任务中。
- 在组任务中使用run_until_complete()等待所有这些任务完成,然后抛出或者处理CancelledError。
asyncio.run()为你执行上面提到的这些操作,但尽管有这些帮助,在构建最初几个重要的asyncio应用程序的过程中,仍将尝试在关闭期间清除错误消息,就像这样:“Task was destroyed but it is pending!”。发生这种情况是因为应用程序没等到执行完前面的一个或几个步骤就关闭主程序了。例3-29是一个引发这个烦人错误的示例。
示例 3-29 未决任务的销毁程序
# taskwarning.py
import asyncio
async def f(delay):
await asyncio.sleep(delay)
loop = asyncio.get_event_loop()
t1 = loop.create_task(f(1))
t2 = loop.create_task(f(2))
loop.run_until_complete(t1)
loop.close()- (L8)任务1将运行1秒。
- (L9)任务2将运行2秒。
- (L10)只能在任务1完成前才运行。
运行这段代码将会有以下输出:
$ python taskwarning.py
Task was destroyed but it is pending!
task: <Task pending coro=<f() done, defined at [...snip...]>这个错误信息告诉你当循环关闭时,有些任务还没有完成。我们想要避免这种情况,这就是为什么一般的关闭过程是收集所有未完成的任务,终止这些未完成的任务,然后等它们在关闭循环之前全部完成。asyncio.run()为你完成了所有这些步骤,但重要的是要详细了解流程,这样你就能够处理更复杂的情况。
我们再看一个更详细的演示了所有这些阶段的代码示例。示例3-30是一个基于telnet的echo服务器的迷你案例研究。
示例 3-30 异步应用程序的生命周期
# telnetdemo.py
import asyncio
from asyncio import StreamReader, StreamWriter
async def echo(reader: StreamReader, writer: StreamWriter):
print('New connection.')
try:
while data := await reader.readline():
writer.write(data.upper())
await writer.drain()
print('Leaving Connection.')
except asyncio.CancelledError:
print('Connection dropped!')
async def main(host='127.0.0.1', port=8888):
server = await asyncio.start_server(echo, host, port)
async with server:
await server.serve_forever()
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Bye!')- (L5) 服务器将使用echo()协程函数为每个连接创建一个协程。这个函数使用streams API与asyncio进行网络连接。
- (L8) 为了保持连接的活性,我们要使用一个无限循环来等待消息接入。
- (L9) 然后把数据直接原样返回给发送方,就是把字母全部变成大写。
- (L12) 如果这个任务被终止,我们会打印一条消息。
- (L16) 这段启动TCP服务器的代码直接取自Python 3.8的官方文档。
启动echo服务器后,你可以使用telnet连接到它并跟它交互:
$ telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
hi!
HI!
stop shouting
STOP SHOUTING
^]
telnet> q/
Connection closed.这次会话的服务器输出如下所示(服务器继续运行,直到我们按Ctrl-C):
$ python telnetdemo.py
New connection.
Leaving Connection.
^CBye!在刚才展示的Telnet会话中,客户机(即Telnet)在服务器停止之前关闭了连接,现在让我们看看如果在连接处于活跃状态时关闭服务器会发生什么。我们会看到服务器进程的如下输出:
$ python telnetdemo.py
New connection.
^CConnection dropped!
Bye!这时你可以看到CancelledError的异常处理程序被触发。现在我们假设这是一个真实的生产级应用程序,我们希望把所有关于断开连接的事件发送到监控服务。代码示例可能被修改为示例3-31。
示例 3-31 在取消步骤中创建任务
# telnetdemo.py
import asyncio
from asyncio import StreamReader, StreamWriter
async def send_event(msg: str):
await asyncio.sleep(1)
async def echo(reader: StreamReader, writer: StreamWriter):
print('New connection.')
try:
while (data := await reader.readline()):
writer.write(data.upper())
await writer.drain()
print('Leaving Connection.')
except asyncio.CancelledError:
msg = 'Connection dropped!'
print(msg)
asyncio.create_task(send_event(msg))
async def main(host='127.0.0.1', port=8888):
server = await asyncio.start_server(echo, host, port)
async with server:
await server.serve_forever()
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Bye!')- (L5) 假设这个协程实际上要联系外部服务器以提交事件通知。
- (L18) 因为事件通知器涉及网络访问,所以这样的调用通常是在单独的异步任务中进行的;这就是我们在这里使用create_task()函数的原因。
但是,这段代码有一个错误。如果我们重新运行这个示例,并确保在连接处于活跃状态时停止服务器(使用Ctrl-C),这个bug就变得很明显了:
$ python telnetdemo.py
New connection.
^CConnection dropped!
Bye!
Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<send_event() done, ...>要理解为什么会发生这种情况,我们必须回到asyncio.run()在关闭阶段所做的清理事件的顺序;特别要注意更重要的部分是,当我们按下Ctrl-C时,所有当前活跃的任务都会被收集和注销。这时候,只有那些当前活跃的任务会被等待直到完成,并且asyncio.run()在所有那些任务完成之后才返回。修改后的代码中的错误是,我们在现有的“echo”任务的注销处理步骤中创建了一个发送事件的新任务。只有在asyncio.run()收集并注销了流程中的所有任务之后,才会创建这个新任务。
这就是为什么了解asyncio.run()是怎么运行的很重要。
建议:一般的经验法则是,尽量避免在CancelledError异常处理程序中创建新任务。如果必须,也一定要等待同一函数范围内的新任务或未来任务。
最后:如果你正在使用一个库或框架,请确保按照它的文档来执行启动和关闭操作。第三方框架通常提供自己的启动和关闭函数,并提供自定义的事件钩子。你可以在第115页的“案例研究:缓存失效”中看到Sanic框架中使用这些钩子的例子。
gather()函数中的return_exceptions=True是什么意思?
你可能已经注意到,示例3-3和示例3-1,在关闭步骤中调用gather()时的关键字参数return_exceptions=True,但我当时故意悄悄地没有提到它。asyncio.run()也在内部使用gather()和return_exceptions=True,现在是进一步讨论这个的时候了。
不幸的是,这个参数默认是gather(…return_exceptions = False)。这个默认值对于大多数情况都是有问题的,包括关闭过程,这就是为什么asyncio.run()把参数设置为True。直接解释有点复杂;相反,让我们通过一系列详细解释来理解以上的观点会更容易:
- run_until_complete()函数里面操作的是一个future;在关闭任务期间,它是gather()返回的future。
- 如果里面的future引发异常,这个异常就会从run_until_complete()抛出,这将导致循环停止运行。
- 如果run_until_complete()函数操作的是一个组future,在任何子任务中引发的任何异常如果不在子任务中处理掉,就会在这个组future中引发异常。注意,这种情况也包括CancelledError。
- 如果只有一部分任务处理CancelledError,而其他任务不处理CancelledError,那么不处理CancelledError的任务将导致循环停止。这意味着循环在所有任务完成之前就会异常终止。
- 在关闭任务的过程中,我们真的不想发生这种行为。我们希望run_until_complete()只在在它组中的所有子任务都已完成时才能终止,不管其中的一些任务是否引发异常。
- 于是我们gather(*, return_exceptions=True):这个设置会让组future把来自于子任务的异常视为返回值,这样它们就不会冒出来干扰到run_until_complete()。
这样就知道了return_exceptions=True和run_until_complete()之间的关系。用这种方式捕获异常的一个不良后果是:有些错误可能没有引起你的注意,因为它们现在正在一组任务的内部处理。如果这是一个麻烦,你可以从run_until_complete()获取输出列表,并扫描它以查找Exception的任何子类,然后编写适合你情况的日志消息。示例3-32演示了这种方法。
示例 3-32 所有的任务都会完成
# alltaskscomplete.py
import asyncio
async def f(delay):
await asyncio.sleep(1 / delay)
return delay
loop = asyncio.get_event_loop()
for i in range(10):
loop.create_task(f(i))
pending = asyncio.all_tasks()
group = asyncio.gather(*pending, return_exceptions=True)
results = loop.run_until_complete(group)
print(f'Results: {results}')
loop.close()- (L5) 这里一定会触发一个异常
这是运行输出:
$ python alltaskscomplete.py
Results: [6, 9, 3, 7, ...
ZeroDivisionError('division by zero',), 4, ...
8, 1, 5, 2]如果没有设置return_exceptions=True,就会从run_until_complete()引发ZeroDivisionError,停止循环,从而阻止其他任务完成。
在下一节中,我们将讨论信号处理(KeyboardInterrupt之外的信号),但在此之前,有必要记住,优雅地关闭是网络编程中比较困难的方面之一,对于asyncio来说也是这样。本节中的信息仅仅是一个开始。我鼓励你在自己的自动化测试中使用明确的关闭测试。不同的应用程序通常需要不同的策略。
建议:我在Python包索引(PyPI)上发布了一个名为aiorun的小包,主要用于我自己在处理asyncio关闭方面的实验和学习,它整合了本节中的许多想法。可能对于你在代码进行修改,并围绕asyncio关闭场景试验你自己的想法方面是有用的。
相关推荐
- 无线wifi路由器怎么安装(请问无线路由器怎么安装)
-
安装的方法/步骤:1、怎么安装无线路由器呢?首先把网线的其中一头插入进光猫里面。2、接着用网线的另一头插入进无线路由器的蓝色接口处,这样就安装好无线路由器啦。3、点击打开电脑浏览器,输入路由器设置地址...
- fat32格式化精灵(格式化fat32格式工具)
-
内存卡格式化一般有两种方式:第一种是直接将内存卡插入手机的卡托,然后进入设置——运行及内存管理,点击格式化SD卡即可完成。当然有一些手机是不支持外置的内存卡插入,这就需要用OTG线插入手机,点击手机的...
- 外置光驱安装win7系统(外置光驱安装操作系统)
-
苹果电脑、电源适配器丶光盘装系统(电脑有光驱、或者外接光驱)光盘安装准备:win764位纯净版安装盘,如果使用的苹果电脑有光驱,优先使用自带光驱安装;如电脑没有光驱,可以是用外接USB光驱安装。光盘...
- win7x86是32位还是64位
-
32位win7x86是32位操作系统,win7x64是64位操作系统。扩展资料Windows7,中文名称视窗7,是由微软公司(Microsoft)开发的操作系统,内核版本号为WindowsNT...
- 用我告诉你安装win7(安装win7教程)
-
方法一:使用工具在线一键下载安装win7(win7正式版只需使用正版密钥激活即可)1、在电脑安装好小白一键重装系统工具打开,选择原版win7旗舰版系统,点击安装此系统。2、等待软件自动下载系统镜像文件...
- sd卡如何修复(如何修复sd卡视频教程)
-
修复SD卡的三个步骤如下:1.使用磁盘检测工具检查SD卡的错误:您可以使用Windows操作系统中自带的磁盘检查工具或第三方软件来检查并修复SD卡中的错误。2.格式化SD卡:如果检查后发现错误无法...
- 安卓手机杀毒软件哪个最好用
-
腾讯手机管家的守护老人安全功能版本我在用,我来说说吧。此版本是专门为守护老人安全设计推出的,不但有效拦截诈骗短信,电话,木马病毒,钓鱼网址,辟谣功能可以帮助老人立即分辨养生讯息,银行卡故障讯息,保险异...
- xp3用什么模拟器打开(xp3用什么模拟器打开好)
-
可以按照以下的步骤排查解决:首先,游戏必须要使kirikiri引擎,这点可以从文件中是否含有部分xp3后缀的文件来判断然后用模拟器打开date.xp3就行了,部分汉化游戏是直接打开exe程序如果遇到d...
- 固态硬盘用mbr还是guid(固态硬盘guid好还是mbr好)
-
如果电脑原装系统是win8或者以上的,那么硬盘分区表格式为GUID(GPT)格式的;如果是win7以下的,那么一般就是MBR的。主引导记录(MBR)是计算机开机后访问硬盘时所必须要读取的首个扇区,由分...
- 为什么fps大神都是400dpi(fps为什么高)
-
400DPI,在游戏里调节不同英雄的鼠标灵敏度,可以保证最小范围微调改动鼠标移动速度。因为DPI和灵敏度是乘积关系。举个例子:如果你玩麦克雷时鼠标DPI是3200,游戏内灵敏度是1。但你切换到源氏和闪...
- 系统集成项目管理工程师难考吗
-
系统集成项目管理工程师考试的普遍通过率是在10%左右,但是并不表示考试真的有那么难。因为考试本身没有报考条件的限制,且考试报名费用很低,很多人都不重视考试。所以通过率普遍偏低,只要你认真备考,有一...
- 360影视大全下载2025免费版(下载360影视大全最新版下载安装到手机版)
-
你好朋友360影视大全里的很多视频都是免费的,建议安装最新的360影视大全就可以了打开360视频,搜索自己需要的视频,点击360播放器右下角的下载箭头,即可将视频进行下载,下载完毕之后视频会保存在36...
- 360安全卫士手机版下载(360安全卫士官方免费下载手机版5.5.0)
-
相当靠谱360手机卫士是一款由奇虎网推出的功能强、效果好、受用户欢迎的上网安全软件。360安全卫士拥有查杀木马、清理插件、修复漏洞、电脑体检、保护隐私等多种功能,并独创了“木马防火墙”“360密盘”等...
- deepin和统信uos(统信和deepin的区别)
-
差不多。1Deepin原名LinuxDeepin、deepinos、深度操作系统,于2014年4月改名Deepin。deepin团队基于Qt/C++(用于前端)和Go(用于后端)开发了的全新深度桌...
- 三星驱动(三星驱动板)
-
驱动是必须装的,但不需要单独安装驱动。 1、电脑的所有硬件,必然要装驱动,键盘、鼠标什么的,都是有驱动的。驱动是软件和硬件结合的桥梁。但多数普通常见的硬件,驱动是widnows系统自带的,不需要用户...
欢迎 你 发表评论:
- 一周热门
-
-
抖音上好看的小姐姐,Python给你都下载了
-
全网最简单易懂!495页Python漫画教程,高清PDF版免费下载
-
Python 3.14 的 UUIDv6/v7/v8 上新,别再用 uuid4 () 啦!
-
飞牛NAS部署TVGate Docker项目,实现内网一键转发、代理、jx
-
python入门到脱坑 输入与输出—str()函数
-
宝塔面板如何添加免费waf防火墙?(宝塔面板开启https)
-
Python三目运算基础与进阶_python三目运算符判断三个变量
-
(新版)Python 分布式爬虫与 JS 逆向进阶实战吾爱分享
-
失业程序员复习python笔记——条件与循环
-
系统u盘安装(win11系统u盘安装)
-
- 最近发表
- 标签列表
-
- python计时 (73)
- python安装路径 (56)
- python类型转换 (93)
- python进度条 (67)
- python吧 (67)
- python的for循环 (65)
- python格式化字符串 (61)
- python静态方法 (57)
- python列表切片 (59)
- python面向对象编程 (60)
- python 代码加密 (65)
- python串口编程 (77)
- python封装 (57)
- python写入txt (66)
- python读取文件夹下所有文件 (59)
- python操作mysql数据库 (66)
- python获取列表的长度 (64)
- python接口 (63)
- python调用函数 (57)
- python多态 (60)
- python匿名函数 (59)
- python打印九九乘法表 (65)
- python赋值 (62)
- python异常 (69)
- python元祖 (57)
