在Python中使用Asyncio系统(3-6)优雅地开启和关闭协程
off999 2025-04-27 15:36 76 浏览 0 评论
优雅地开启和关闭协程
大多数基于异步的程序都是基于网络的并且长期运行的应用程序。这个领域在处理如何启动和关闭的过程中有惊人的复杂性。
在这两个操作中,启动一般比较简单。启动异步应用程序的标准方式是有一个main()协程函数,并用asyncio.run()调用它,就像本章开头的示例3-2所演示的那样。
一般来说,启动都会相当的简单直接;比如前面描述的服务器案例,你可能在文档中不止一次的阅读到它。我们待会儿要在后面的章节简要通过代码来介绍一个服务器启动的演示。
关闭程序一般更复杂一些。为了关闭,我之前提到了在asyncio.run()中的准备步骤。当async def main()函数退出时,要采取以下操作:
- 收集所有仍在排队的任务对象(如果有的话)。
- 取消这些任务(这个步骤会在每个运行的协程中抛出CancelledError异常,就是你在协程函数的代码里使用try/except来处理的那个异常)。
- 把所有这些任务放到一个组任务中。
- 在组任务中使用run_until_complete()等待所有这些任务完成,然后抛出或者处理CancelledError。
asyncio.run()为你执行上面提到的这些操作,但尽管有这些帮助,在构建最初几个重要的asyncio应用程序的过程中,仍将尝试在关闭期间清除错误消息,就像这样:“Task was destroyed but it is pending!”。发生这种情况是因为应用程序没等到执行完前面的一个或几个步骤就关闭主程序了。例3-29是一个引发这个烦人错误的示例。
示例 3-29 未决任务的销毁程序
# taskwarning.py
import asyncio
async def f(delay):
await asyncio.sleep(delay)
loop = asyncio.get_event_loop()
t1 = loop.create_task(f(1))
t2 = loop.create_task(f(2))
loop.run_until_complete(t1)
loop.close()- (L8)任务1将运行1秒。
- (L9)任务2将运行2秒。
- (L10)只能在任务1完成前才运行。
运行这段代码将会有以下输出:
$ python taskwarning.py
Task was destroyed but it is pending!
task: <Task pending coro=<f() done, defined at [...snip...]>这个错误信息告诉你当循环关闭时,有些任务还没有完成。我们想要避免这种情况,这就是为什么一般的关闭过程是收集所有未完成的任务,终止这些未完成的任务,然后等它们在关闭循环之前全部完成。asyncio.run()为你完成了所有这些步骤,但重要的是要详细了解流程,这样你就能够处理更复杂的情况。
我们再看一个更详细的演示了所有这些阶段的代码示例。示例3-30是一个基于telnet的echo服务器的迷你案例研究。
示例 3-30 异步应用程序的生命周期
# telnetdemo.py
import asyncio
from asyncio import StreamReader, StreamWriter
async def echo(reader: StreamReader, writer: StreamWriter):
print('New connection.')
try:
while data := await reader.readline():
writer.write(data.upper())
await writer.drain()
print('Leaving Connection.')
except asyncio.CancelledError:
print('Connection dropped!')
async def main(host='127.0.0.1', port=8888):
server = await asyncio.start_server(echo, host, port)
async with server:
await server.serve_forever()
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Bye!')- (L5) 服务器将使用echo()协程函数为每个连接创建一个协程。这个函数使用streams API与asyncio进行网络连接。
- (L8) 为了保持连接的活性,我们要使用一个无限循环来等待消息接入。
- (L9) 然后把数据直接原样返回给发送方,就是把字母全部变成大写。
- (L12) 如果这个任务被终止,我们会打印一条消息。
- (L16) 这段启动TCP服务器的代码直接取自Python 3.8的官方文档。
启动echo服务器后,你可以使用telnet连接到它并跟它交互:
$ telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
hi!
HI!
stop shouting
STOP SHOUTING
^]
telnet> q/
Connection closed.这次会话的服务器输出如下所示(服务器继续运行,直到我们按Ctrl-C):
$ python telnetdemo.py
New connection.
Leaving Connection.
^CBye!在刚才展示的Telnet会话中,客户机(即Telnet)在服务器停止之前关闭了连接,现在让我们看看如果在连接处于活跃状态时关闭服务器会发生什么。我们会看到服务器进程的如下输出:
$ python telnetdemo.py
New connection.
^CConnection dropped!
Bye!这时你可以看到CancelledError的异常处理程序被触发。现在我们假设这是一个真实的生产级应用程序,我们希望把所有关于断开连接的事件发送到监控服务。代码示例可能被修改为示例3-31。
示例 3-31 在取消步骤中创建任务
# telnetdemo.py
import asyncio
from asyncio import StreamReader, StreamWriter
async def send_event(msg: str):
await asyncio.sleep(1)
async def echo(reader: StreamReader, writer: StreamWriter):
print('New connection.')
try:
while (data := await reader.readline()):
writer.write(data.upper())
await writer.drain()
print('Leaving Connection.')
except asyncio.CancelledError:
msg = 'Connection dropped!'
print(msg)
asyncio.create_task(send_event(msg))
async def main(host='127.0.0.1', port=8888):
server = await asyncio.start_server(echo, host, port)
async with server:
await server.serve_forever()
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Bye!')- (L5) 假设这个协程实际上要联系外部服务器以提交事件通知。
- (L18) 因为事件通知器涉及网络访问,所以这样的调用通常是在单独的异步任务中进行的;这就是我们在这里使用create_task()函数的原因。
但是,这段代码有一个错误。如果我们重新运行这个示例,并确保在连接处于活跃状态时停止服务器(使用Ctrl-C),这个bug就变得很明显了:
$ python telnetdemo.py
New connection.
^CConnection dropped!
Bye!
Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<send_event() done, ...>要理解为什么会发生这种情况,我们必须回到asyncio.run()在关闭阶段所做的清理事件的顺序;特别要注意更重要的部分是,当我们按下Ctrl-C时,所有当前活跃的任务都会被收集和注销。这时候,只有那些当前活跃的任务会被等待直到完成,并且asyncio.run()在所有那些任务完成之后才返回。修改后的代码中的错误是,我们在现有的“echo”任务的注销处理步骤中创建了一个发送事件的新任务。只有在asyncio.run()收集并注销了流程中的所有任务之后,才会创建这个新任务。
这就是为什么了解asyncio.run()是怎么运行的很重要。
建议:一般的经验法则是,尽量避免在CancelledError异常处理程序中创建新任务。如果必须,也一定要等待同一函数范围内的新任务或未来任务。
最后:如果你正在使用一个库或框架,请确保按照它的文档来执行启动和关闭操作。第三方框架通常提供自己的启动和关闭函数,并提供自定义的事件钩子。你可以在第115页的“案例研究:缓存失效”中看到Sanic框架中使用这些钩子的例子。
gather()函数中的return_exceptions=True是什么意思?
你可能已经注意到,示例3-3和示例3-1,在关闭步骤中调用gather()时的关键字参数return_exceptions=True,但我当时故意悄悄地没有提到它。asyncio.run()也在内部使用gather()和return_exceptions=True,现在是进一步讨论这个的时候了。
不幸的是,这个参数默认是gather(…return_exceptions = False)。这个默认值对于大多数情况都是有问题的,包括关闭过程,这就是为什么asyncio.run()把参数设置为True。直接解释有点复杂;相反,让我们通过一系列详细解释来理解以上的观点会更容易:
- run_until_complete()函数里面操作的是一个future;在关闭任务期间,它是gather()返回的future。
- 如果里面的future引发异常,这个异常就会从run_until_complete()抛出,这将导致循环停止运行。
- 如果run_until_complete()函数操作的是一个组future,在任何子任务中引发的任何异常如果不在子任务中处理掉,就会在这个组future中引发异常。注意,这种情况也包括CancelledError。
- 如果只有一部分任务处理CancelledError,而其他任务不处理CancelledError,那么不处理CancelledError的任务将导致循环停止。这意味着循环在所有任务完成之前就会异常终止。
- 在关闭任务的过程中,我们真的不想发生这种行为。我们希望run_until_complete()只在在它组中的所有子任务都已完成时才能终止,不管其中的一些任务是否引发异常。
- 于是我们gather(*, return_exceptions=True):这个设置会让组future把来自于子任务的异常视为返回值,这样它们就不会冒出来干扰到run_until_complete()。
这样就知道了return_exceptions=True和run_until_complete()之间的关系。用这种方式捕获异常的一个不良后果是:有些错误可能没有引起你的注意,因为它们现在正在一组任务的内部处理。如果这是一个麻烦,你可以从run_until_complete()获取输出列表,并扫描它以查找Exception的任何子类,然后编写适合你情况的日志消息。示例3-32演示了这种方法。
示例 3-32 所有的任务都会完成
# alltaskscomplete.py
import asyncio
async def f(delay):
await asyncio.sleep(1 / delay)
return delay
loop = asyncio.get_event_loop()
for i in range(10):
loop.create_task(f(i))
pending = asyncio.all_tasks()
group = asyncio.gather(*pending, return_exceptions=True)
results = loop.run_until_complete(group)
print(f'Results: {results}')
loop.close()- (L5) 这里一定会触发一个异常
这是运行输出:
$ python alltaskscomplete.py
Results: [6, 9, 3, 7, ...
ZeroDivisionError('division by zero',), 4, ...
8, 1, 5, 2]如果没有设置return_exceptions=True,就会从run_until_complete()引发ZeroDivisionError,停止循环,从而阻止其他任务完成。
在下一节中,我们将讨论信号处理(KeyboardInterrupt之外的信号),但在此之前,有必要记住,优雅地关闭是网络编程中比较困难的方面之一,对于asyncio来说也是这样。本节中的信息仅仅是一个开始。我鼓励你在自己的自动化测试中使用明确的关闭测试。不同的应用程序通常需要不同的策略。
建议:我在Python包索引(PyPI)上发布了一个名为aiorun的小包,主要用于我自己在处理asyncio关闭方面的实验和学习,它整合了本节中的许多想法。可能对于你在代码进行修改,并围绕asyncio关闭场景试验你自己的想法方面是有用的。
相关推荐
- 阿里云国际站ECS:阿里云ECS如何提高网站的访问速度?
-
TG:@yunlaoda360引言:速度即体验,速度即业务在当今数字化的世界中,网站的访问速度已成为决定用户体验、用户留存乃至业务转化率的关键因素。页面加载每延迟一秒,都可能导致用户流失和收入损失。对...
- 高流量大并发Linux TCP性能调优_linux 高并发网络编程
-
其实主要是手里面的跑openvpn服务器。因为并没有明文禁p2p(哎……想想那么多流量好像不跑点p2p也跑不完),所以造成有的时候如果有比较多人跑BT的话,会造成VPN速度急剧下降。本文所面对的情况为...
- 性能测试100集(12)性能指标资源使用率
-
在性能测试中,资源使用率是评估系统硬件效率的关键指标,主要包括以下四类:#性能测试##性能压测策略##软件测试#1.CPU使用率定义:CPU处理任务的时间占比,计算公式为1-空闲时间/总...
- Linux 服务器常见的性能调优_linux高性能服务端编程
-
一、Linux服务器性能调优第一步——先搞懂“看什么”很多人刚接触Linux性能调优时,总想着直接改配置,其实第一步该是“看清楚问题”。就像医生看病要先听诊,调优前得先知道服务器“哪里...
- Nginx性能优化实战:手把手教你提升10倍性能!
-
关注△mikechen△,十余年BAT架构经验倾囊相授!Nginx是大型架构而核心,下面我重点详解Nginx性能@mikechen文章来源:mikechen.cc1.worker_processe...
- 高并发场景下,Spring Cloud Gateway如何抗住百万QPS?
-
关注△mikechen△,十余年BAT架构经验倾囊相授!大家好,我是mikechen。高并发场景下网关作为流量的入口非常重要,下面我重点详解SpringCloudGateway如何抗住百万性能@m...
- Kubernetes 高并发处理实战(可落地案例 + 源码)
-
目标场景:对外提供HTTPAPI的微服务在短时间内收到大量请求(例如每秒数千至数万RPS),要求系统可弹性扩容、限流降级、缓存减压、稳定运行并能自动恢复。总体思路(多层防护):边缘层:云LB...
- 高并发场景下,Nginx如何扛住千万级请求?
-
Nginx是大型架构的必备中间件,下面我重点详解Nginx如何实现高并发@mikechen文章来源:mikechen.cc事件驱动模型Nginx采用事件驱动模型,这是Nginx高并发性能的基石。传统...
- Spring Boot+Vue全栈开发实战,中文版高清PDF资源
-
SpringBoot+Vue全栈开发实战,中文高清PDF资源,需要的可以私我:)SpringBoot致力于简化开发配置并为企业级开发提供一系列非业务性功能,而Vue则采用数据驱动视图的方式将程序...
- Docker-基础操作_docker基础实战教程二
-
一、镜像1、从仓库获取镜像搜索镜像:dockersearchimage_name搜索结果过滤:是否官方:dockersearch--filter="is-offical=true...
- 你有空吗?跟我一起搭个服务器好不好?
-
来人人都是产品经理【起点学院】,BAT实战派产品总监手把手系统带你学产品、学运营。昨天闲的没事的时候,随手翻了翻写过的文章,发现一个很严重的问题。就是大多数时间我都在滔滔不绝的讲理论,却很少有涉及动手...
- 部署你自己的 SaaS_saas如何部署
-
部署你自己的VPNOpenVPN——功能齐全的开源VPN解决方案。(DigitalOcean教程)dockovpn.io—无状态OpenVPNdockerized服务器,不需要持久存储。...
- Docker Compose_dockercompose安装
-
DockerCompose概述DockerCompose是一个用来定义和管理多容器应用的工具,通过一个docker-compose.yml文件,用YAML格式描述服务、网络、卷等内容,...
- 京东T7架构师推出的电子版SpringBoot,从构建小系统到架构大系统
-
前言:Java的各种开发框架发展了很多年,影响了一代又一代的程序员,现在无论是程序员,还是架构师,使用这些开发框架都面临着两方面的挑战。一方面是要快速开发出系统,这就要求使用的开发框架尽量简单,无论...
- Kubernetes (k8s) 入门学习指南_k8s kubeproxy
-
Kubernetes(k8s)入门学习指南一、什么是Kubernetes?为什么需要它?Kubernetes(k8s)是一个开源的容器编排系统,用于自动化部署、扩展和管理容器化应用程序。它...
欢迎 你 发表评论:
- 一周热门
-
-
抖音上好看的小姐姐,Python给你都下载了
-
全网最简单易懂!495页Python漫画教程,高清PDF版免费下载
-
Python 3.14 的 UUIDv6/v7/v8 上新,别再用 uuid4 () 啦!
-
python入门到脱坑 输入与输出—str()函数
-
宝塔面板如何添加免费waf防火墙?(宝塔面板开启https)
-
Python三目运算基础与进阶_python三目运算符判断三个变量
-
(新版)Python 分布式爬虫与 JS 逆向进阶实战吾爱分享
-
失业程序员复习python笔记——条件与循环
-
慕ke 前端工程师2024「完整」
-
飞牛NAS部署TVGate Docker项目,实现内网一键转发、代理、jx
-
- 最近发表
- 标签列表
-
- python计时 (73)
- python安装路径 (56)
- python类型转换 (93)
- python进度条 (67)
- python吧 (67)
- python的for循环 (65)
- python格式化字符串 (61)
- python静态方法 (57)
- python列表切片 (59)
- python面向对象编程 (60)
- python 代码加密 (65)
- python串口编程 (77)
- python封装 (57)
- python写入txt (66)
- python读取文件夹下所有文件 (59)
- python操作mysql数据库 (66)
- python获取列表的长度 (64)
- python接口 (63)
- python调用函数 (57)
- python多态 (60)
- python匿名函数 (59)
- python打印九九乘法表 (65)
- python赋值 (62)
- python异常 (69)
- python元祖 (57)
