24-4-Python多线程-进程操作-案例
off999 2025-04-29 03:19 37 浏览 0 评论
4-1-介绍
4-1-1-Python的进程
虽然Python语言支持创建多线程应用程序,但是Python解释器使用了内部的全局解释器锁定(GIL),在任意指定的时刻只允许执行单个线程,并且限制了Python程序只能在一个处理器上运行。而现代CPU已经以多核为主,但Python的多线程程序无法使用。使用Python的多进程模块可以将工作分派给不受锁定限制的单个子进程。
4-1-2-创建进程的模块
- 在Python 3语言中,对多进程支持的是multiprocessing模块和subprocess模块。
- 使用模块multiprocessing可以创建并使用多进程,具体用法和模块threading的使用方法类似。
- 创建进程使用multiprocessing.Process对象来完成,和threading.Thread一样,可以使用它以进程方式运行函数,也可以通过继承它并重载run()方法创建进程。
- 模块multiprocessing同样具有模块threading中用于同步的Lock、RLock及用于通信的Event
4-2-subprocess模块
4-1-1-subprocess介绍
subprocess 模块是 Python 标准库中的一个强大工具,
用于创建新的进程、连接到它们的输入输出错误管道,并获取它们的返回码。
它允许你在 Python 脚本中执行外部命令,就像在终端中执行一样。
4-1-1-1-基本使用方法
1-subprocess.run()
这是 Python 3.5 及以后版本推荐的执行外部命令的方式。
它会等待命令执行完成,并返回一个 CompletedProcess 对象,该对象包含命令的执行结果信息。
在上述代码中,subprocess.run()接受一个列表作为参数,列表中的第一个元素是要执行的命令,后续元素是命令的参数。capture_output=True 表示捕获命令的标准输出和标准错误输出,text=True表示以文本模式处理输出。
2-subprocess.Popen()
Popen类提供了更底层的进程创建和控制功能,它允许你在命令执行过程中与其进行交互。
import subprocess
# 创建一个新的进程
process = subprocess.Popen(['ping', 'www.baidu.com'], stdout=subprocess.PIPE, text=True)
# 读取命令的输出
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print(output.strip())
# 等待进程结束并获取返回码
returncode = process.wait()
print(f"命令返回码: {returncode}")在这个例子中,subprocess.Popen()创建了一个新的进程来执行 `ping` 命令,并通过 stdout=subprocess.PIPE 将命令的标准输出重定向到一个管道,以便在 Python 脚本中读取。
4-1-1-2-常用参数
args:要执行的命令,可以是字符串或列表。如果是列表,第一个元素是命令名,后续元素是命令的参数。
stdin、stdout、stderr:用于指定标准输入、标准输出和标准错误输出的处理方式。可以设置为 `subprocess.PIPE` 以捕获输出,或者设置为 `subprocess.DEVNULL` 以丢弃输出。
shell:如果设置为 `True`,则通过 shell 执行命令。默认值为 `False`。使用 `shell=True` 时要注意安全问题,因为可能会导致命令注入。
capture_output:如果设置为 `True`,则捕获命令的标准输出和标准错误输出。这是 Python 3.7 及以后版本的参数。
text:如果设置为 `True`,则以文本模式处理输入和输出,默认以字节模式处理。
4-1-2-异常处理
在使用 subprocess 模块时,可能会抛出一些异常,例如 FileNotFoundError表示找不到要执行的命令,CalledProcessError表示命令执行失败(返回码不为 0)。以下是一个异常处理的示例:
import subprocess
try:
result = subprocess.run(['nonexistent_command'], check=True, capture_output=True, text=True)
except FileNotFoundError:
print("命令未找到")
except subprocess.CalledProcessError as e:
print(f"命令执行失败,返回码: {e.returncode}")
print(e.stderr)在上述代码中,check=True表示如果命令执行失败(返回码不为 0),则抛出 CalledProcessError 异常。
4-1-3-实际应用场景
自动化脚本:在 Python 脚本中执行系统命令,实现自动化部署、文件处理等任务。
调用外部程序:调用其他编程语言编写的程序,并与其进行交互。
系统监控:执行系统命令获取系统信息,如 CPU 使用率、内存使用情况等。
4-3-multiprocessing模块
multiprocessing是 Python 标准库中的一个模块,它允许你生成多个进程,以实现并行计算,克服了 Python 中全局解释器锁(GIL)对多线程并行计算的限制,从而在多核 CPU 上充分利用系统资源,提高程序的运行效率。以下为你详细介绍该模块:
4-3-1-基本使用
4-3-1-1-创建进程
你可以通过 multiprocessing.Process 类来创建新的进程,下面是一个简单示例:
import multiprocessing
def worker(num):
"""进程要执行的任务"""
print(f'Worker {num} started')
print(f'Worker {num} finished')
if __name__ == '__main__':
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()在上述代码里,multiprocessing.Process 类接收两个重要参数,target 是进程要执行的函数,args 是传递给该函数的参数。start() 方法用于启动进程,join() 方法会让主进程等待子进程执行完毕。
4-3-1-2-进程间通信
multiprocessing模块提供了多种进程间通信(IPC)的方式,常见的有队列(`Queue`)和管道(`Pipe`)。
4-3-1-2-1-使用队列进行通信
import multiprocessing
def producer(q):
for i in range(5):
q.put(i)
print(f'Produced {i}')
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f'Consumed {item}')
if __name__ == '__main__':
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(q,))
p2 = multiprocessing.Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
q.put(None)
p2.join()在这个例子中,producer 函数将数据放入队列,consumer 函数从队列中取出数据进行处理。
4-3-1-2-2-使用管道进行通信
import multiprocessing
def sender(conn):
messages = ['Hello', 'World', '!']
for msg in messages:
conn.send(msg)
conn.close()
def receiver(conn):
while True:
try:
msg = conn.recv()
if not msg:
break
print(f'Received: {msg}')
except EOFError:
break
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=(child_conn,))
p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()这里通过 multiprocessing.Pipe() 创建了一个管道,管道有两个连接对象,分别用于发送和接收数据。
4-3-1-3-进程池
当你需要处理大量任务时,手动创建和管理每个进程会很繁琐,此时可以使用 multiprocessing.Pool类来创建进程池。
import multiprocessing
def square(x):
return x * x
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(square, [1, 2, 3, 4, 5])
print(results)pool.map() 方法会将可迭代对象中的每个元素依次传递给指定的函数进行处理,并返回处理结果的列表。
4-3-2-注意事项
4-3-2-1-if __name__ == '__main__'
在 Windows 和某些其他操作系统上,使用 `multiprocessing` 模块时,必须将创建和启动进程的代码放在 `if __name__ == '__main__':` 语句块中,以避免递归创建进程。
4-3-2-2-资源管理
进程会占用系统资源,因此要合理控制进程的数量,避免资源耗尽。
使用 join() 方法确保子进程执行完毕,防止僵尸进程的产生。
5-案例
5-1-需求
多线程实现抢票功能
5-2-代码
5-2-1-database_utils.py
该文件主要负责数据库连接池的配置和数据库查询操作。
import logging
import mysql.connector
from mysql.connector import pooling
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 数据库连接池配置
dbconfig = {
"host": "localhost",
"user": "your_username",
"password": "your_password",
"database": "your_database",
"pool_name": "mypool",
"pool_size": 5,
"connect_timeout": 10
}
pool = pooling.MySQLConnectionPool(**dbconfig)
def execute_query(query, params=None):
try:
# 从连接池获取连接
mydb = pool.get_connection()
mycursor = mydb.cursor()
if params:
mycursor.execute(query, params)
else:
mycursor.execute(query)
if query.strip().lower().startswith("select"):
result = mycursor.fetchall()
else:
mydb.commit()
result = None
return result
except mysql.connector.errors.InterfaceError as err:
if err.errno == 2003:
logging.error(f"数据库连接超时: {err}")
else:
logging.error(f"数据库接口错误: {err}")
return None
except mysql.connector.Error as err:
logging.error(f"数据库操作出错: {err}")
return None
except Exception as e:
logging.error(f"执行查询时出现未知错误: {e}")
return None
finally:
if 'mydb' in locals() and mydb.is_connected():
mycursor.close()
mydb.close()
5-2-2-user_utils.py
from database_utils import execute_query
# 用户信息类
class User:
def __init__(self, user_id, name, id_number):
self.user_id = user_id
self.name = name
self.id_number = id_number
def __str__(self):
return f"用户ID: {self.user_id}, 姓名: {self.name}, 身份证号: {self.id_number}"
def get_users_from_db():
try:
query = "SELECT user_id, name, id_number FROM users"
result = execute_query(query)
users = []
if result:
for user_id, name, id_number in result:
user = User(user_id, name, id_number)
users.append(user)
return users
except Exception as e:
logging.error(f"获取用户信息时出错: {e}")
return []
5-2-3-train_utils.py
该文件负责获取车次票信息和更新车次票信息。
import logging
from database_utils import execute_query
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_trains_from_db():
try:
query = "SELECT train_number, seat_type, tickets FROM train_tickets"
result = execute_query(query)
trains = {}
if result:
for train_number, seat_type, tickets in result:
if train_number not in trains:
trains[train_number] = {}
trains[train_number][seat_type] = tickets
return trains
except Exception as e:
logging.error(f"获取车次票信息时出错: {e}")
return {}
def update_ticket_in_db(train_number, seat_type, new_tickets):
try:
query = "UPDATE train_tickets SET tickets = %s WHERE train_number = %s AND seat_type = %s"
params = (new_tickets, train_number, seat_type)
execute_query(query, params)
logging.info(f"{train_number} 次列车的 {seat_type} 座位票数已更新为 {new_tickets}。")
except Exception as e:
logging.error(f"更新车次票信息时出错: {e}")
5-2-4-ticket_request_utils.py
该文件用于获取车票请求信息。
from database_utils import execute_query
from user_utils import get_users_from_db
def get_ticket_requests_from_db(user_id=None):
try:
if user_id is None:
query = "SELECT user_id, train_number, seat_type FROM ticket_requests"
params = None
else:
query = "SELECT user_id, train_number, seat_type FROM ticket_requests WHERE user_id = %s"
params = (user_id,)
result = execute_query(query, params)
users = get_users_from_db()
user_dict = {user.user_id: user for user in users}
ticket_requests = []
for user_id, train_number, seat_type in result:
if user_id in user_dict:
ticket_requests.append((user_dict[user_id], train_number, seat_type))
return ticket_requests
except Exception as e:
logging.error(f"获取抢票请求时出错: {e}")
return []
5-2-5-main.py
该文件是主程序入口,负责启动线程进行抢票操作。
import threading
import logging
from train_utils import get_trains_from_db, update_ticket_in_db
from user_utils import User
from ticket_request_utils import get_ticket_requests_from_db
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 创建一个线程锁,用于线程同步
lock = threading.Lock()
def book_ticket(user, train_number, seat_type):
trains = get_trains_from_db()
result = None
if train_number in trains and seat_type in trains[train_number]:
if trains[train_number][seat_type] > 0:
try:
# 模拟一些处理时间
import time
time.sleep(0.1)
# 使用上下文管理器管理锁
with lock:
# 再次检查票数,防止竞态条件
trains = get_trains_from_db()
if train_number in trains and seat_type in trains[train_number] and trains[train_number][seat_type] > 0:
new_tickets = trains[train_number][seat_type] - 1
trains[train_number][seat_type] = new_tickets
result = f"{user} 抢到了 {train_number} 次列车的 {seat_type} 座位,剩余 {seat_type} 票数: {new_tickets}"
logging.info(result)
update_ticket_in_db(train_number, seat_type, new_tickets)
else:
result = f"{user} 抢票失败,{train_number} 次列车的 {seat_type} 座位已售罄。"
logging.info(result)
except Exception as e:
result = f"抢票过程中出现异常: {e}"
logging.error(result)
else:
result = f"{user} 抢票失败,{train_number} 次列车的 {seat_type} 座位已售罄。"
logging.info(result)
else:
result = f"{user} 抢票失败,车次 {train_number} 或座位类型 {seat_type} 不存在。"
logging.info(result)
# 将抢票结果保存到数据库
from database_utils import execute_query
query = "INSERT INTO ticket_booking_results (user_id, train_number, seat_type, result) VALUES (%s, %s, %s, %s)"
params = (user.user_id, train_number, seat_type, result)
execute_query(query, params)
logging.info("抢票结果已保存到数据库。")
if __name__ == "__main__":
try:
# 示例:获取用户 ID 为 1 的车票请求
user_id = 1
ticket_requests = get_ticket_requests_from_db(user_id)
threads = []
for user, train_number, seat_type in ticket_requests:
try:
# 创建线程
t = threading.Thread(target=book_ticket, args=(user, train_number, seat_type))
threads.append(t)
# 启动线程
t.start()
except Exception as e:
logging.error(f"创建或启动线程时出错: {e}")
# 等待所有线程完成
for t in threads:
try:
t.join()
except Exception as e:
logging.error(f"等待线程完成时出错: {e}")
logging.info("抢票结束。")
except Exception as e:
logging.error(f"主程序执行出错: {e}")
- 上一篇:Python多进程的实现
- 下一篇:一文看懂Python中异步、进程、线程、队列
相关推荐
- 安全教育登录入口平台(安全教育登录入口平台官网)
-
122交通安全教育怎么登录:122交通网的注册方法是首先登录网址http://www.122.cn/,接着打开网页后,点击右上角的“个人登录”;其次进入邮箱注册,然后进入到注册页面,输入相关信息即可完...
- 大鱼吃小鱼经典版(大鱼吃小鱼经典版(经典版)官方版)
-
大鱼吃小鱼小鱼吃虾是于谦跟郭麒麟的《我的棒儿呢?》郭德纲说于思洋郭麒麟作诗的相声,最后郭麒麟做了一首,师傅躺在师母身上大鱼吃小鱼小鱼吃虾虾吃水水落石出师傅压师娘师娘压床床压地地动山摇。...
-
- 哪个软件可以免费pdf转ppt(免费的pdf转ppt软件哪个好)
-
要想将ppt免费转换为pdf的话,我们建议大家可以下一个那个wps,如果你是会员的话,可以注册为会员,这样的话,在wps里面的话,就可以免费将ppt呢转换为pdfpdf之后呢,我们就可以直接使用,不需要去直接不需要去另外保存,为什么格式转...
-
2026-02-04 09:03 off999
- 电信宽带测速官网入口(电信宽带测速官网入口app)
-
这个网站看看http://www.swok.cn/pcindex.jsp1.登录中国电信网上营业厅,宽带光纤,贴心服务,宽带测速2.下载第三方软件,如360等。进行在线测速进行宽带测速时,尽...
- 植物大战僵尸95版手机下载(植物大战僵尸95 版下载)
-
1可以在应用商店或者游戏平台上下载植物大战僵尸95版手机游戏。2下载教程:打开应用商店或者游戏平台,搜索“植物大战僵尸95版”,找到游戏后点击下载按钮,等待下载完成即可安装并开始游戏。3注意:确...
- 免费下载ppt成品的网站(ppt成品免费下载的网站有哪些)
-
1、Chuangkit(chuangkit.com)直达地址:chuangkit.com2、Woodo幻灯片(woodo.cn)直达链接:woodo.cn3、OfficePlus(officeplu...
- 2025世界杯赛程表(2025世界杯在哪个国家)
-
2022年卡塔尔世界杯赛程公布,全部比赛在卡塔尔境内8座球场举行,2022年,决赛阶段球队全部确定。揭幕战于当地时间11月20日19时进行,由东道主卡塔尔对阵厄瓜多尔,决赛于当地时间12月18日...
- 下载搜狐视频电视剧(搜狐电视剧下载安装)
-
搜狐视频APP下载好的视频想要导出到手机相册里方法如下1、打开手机搜狐视频软件,进入搜狐视频后我们点击右上角的“查找”,找到自已喜欢的视频。2、在“浏览器页面搜索”窗口中,输入要下载的视频的名称,然后...
- 永久免费听歌网站(丫丫音乐网)
-
可以到《我爱音乐网》《好听音乐网》《一听音乐网》《YYMP3音乐网》还可以到《九天音乐网》永久免费听歌软件有酷狗音乐和天猫精灵,以前要跳舞经常要下载舞曲,我从QQ上找不到舞曲下载就从酷狗音乐上找,大多...
- 音乐格式转换mp3软件(音乐格式转换器免费版)
-
有两种方法:方法一在手机上操作:1、进入手机中的文件管理。2、在其中选择“音乐”,将显示出手机中的全部音乐。3、点击“全选”,选中所有音乐文件。4、点击屏幕右下方的省略号图标,在弹出菜单中选择“...
- 电子书txt下载(免费的最全的小说阅读器)
-
1.Z-library里面收录了近千万本电子书籍,需求量大。2.苦瓜书盘没有广告,不需要账号注册,使用起来非常简单,直接搜索预览下载即可。3.鸠摩搜书整体风格简洁清晰,书籍资源丰富。4.亚马逊图书书籍...
- 最好免费观看高清电影(播放免费的最好看的电影)
-
在目前的网上选择中,IMDb(互联网电影数据库)被认为是最全的电影网站之一。这个网站提供了各种类型的电影和电视节目的海量信息,包括剧情介绍、演员表、评价、评论等。其还提供了有关电影制作背后的详细信息,...
- 孤单枪手2简体中文版(孤单枪手2简体中文版官方下载)
-
要将《孤胆枪手2》游戏的征兵秘籍切换为中文,您可以按照以下步骤进行操作:首先,打开游戏设置选项,通常可以在游戏主菜单或游戏内部找到。然后,寻找语言选项或界面选项,点击进入。在语言选项中,选择中文作为游...
欢迎 你 发表评论:
- 一周热门
- 最近发表
- 标签列表
-
- python计时 (73)
- python安装路径 (56)
- python类型转换 (93)
- python进度条 (67)
- python吧 (67)
- python的for循环 (65)
- python格式化字符串 (61)
- python静态方法 (57)
- python列表切片 (59)
- python面向对象编程 (60)
- python 代码加密 (65)
- python串口编程 (77)
- python封装 (57)
- python写入txt (66)
- python读取文件夹下所有文件 (59)
- python操作mysql数据库 (66)
- python获取列表的长度 (64)
- python接口 (63)
- python调用函数 (57)
- python多态 (60)
- python匿名函数 (59)
- python打印九九乘法表 (65)
- python赋值 (62)
- python异常 (69)
- python元祖 (57)
