python定时任务最强框架APScheduler详细教程
off999 2025-07-23 17:38 43 浏览 0 评论
APScheduler定时任务
上次测试女神听了我的建议,已经做好了要给项目添加定时任务的决定了。但是之前提供的四种方式中,她不知道具体选择哪一个。为了和女神更近一步,我把我入行近10年收藏的干货免费拿出来分享给女神,希望女神凌晨2点再找我的时候,不再是因为要给他调程序了。
Python中定时任务的解决方案,总体来说有四种,分别是:crontab、 scheduler、 Celery、 APScheduler,其中 crontab不适合多台服务器的配置、scheduler太过于简单、 Celery依赖的软件比较多,比较耗资源。最好的解决方案就是 APScheduler。
APScheduler使用起来十分方便。提供了基于日期、固定时间间隔以及 crontab类型的任务。还可以在程序运行过程中动态的新增任务和删除任务。在任务运行过程中,还可以把任务存储起来,下次启动运行依然保留之前的状态。另外最重要的一个特点是,因为他是基于 Python语言的库,所以是可以跨平台的,一段代码,处处运行!
在这里我来给大家详细介绍一下具体的用法。
一、安装:
安装非常简单,通过 pip install apscheduler即可。
二、基本使用:
先来看一段代码,然后再来给大家详细讲解其中的细节:
其中 BlockingScheduler是阻塞性的调度器,是最基本的调度器,下面调用 start方法就会阻塞当前进程,所以如果你的程序除了调度进程没有其他后台进程,那么是可以是否的,否则这个调度器会阻塞你程序的正常执行。
接下来就是定义一个 my_clock函数,这个函数就是需要定时调度的任务代码。
然后就是实例化一个 BlockingScheduler对象,并把 my_clock添加到任务调度中。然后看 interval参数,这里用的是间隔的方式来调度,调度频率是 seconds=3,也就是每3秒执行一次。
执行结果如下:
可以看到每隔3秒钟的时间会执行一次。说明定时任务已经成功执行了!
在了解了 APScheduler的基本使用后,再来对 APScheduler的四个基本对象做个了解,这样才能从全局掌握 APScheduler。
三、四个基本对象:
1. 触发器(triggers):
触发器就是根据你指定的触发方式,比如是按照时间间隔,还是按照 crontab触发,触发条件是什么等。每个任务都有自己的触发器。
2. 任务存储器(job stores):
任务存储器是可以存储任务的地方,默认情况下任务保存在内存,也可将任务保存在各种数据库中。任务存储进去后,会进行序列化,然后也可以反序列化提取出来,继续执行。
3. 执行器(executors):
执行器的目的是安排任务到线程池或者进程池中运行的。
4. 调度器(schedulers):
任务调度器是属于整个调度的总指挥官。他会合理安排作业存储器、执行器、触发器进行工作,并进行添加和删除任务等。调度器通常是只有一个的。开发人员很少直接操作触发器、存储器、执行器等。因为这些都由调度器自动来实现了。
四、触发器:
触发器有两种,第一种是 interval,第二种是 crontab。interval可以具体指定多少时间间隔执行一次。crontab可以指定执行的日期策略。以下分别进行讲解。
1. date触发器:
在某个日期时间只触发一次事件。示例代码如下:
更多请参考:
https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html
2. interval触发器:
想要在固定的时间间隔触发事件。interval的触发器可以设置以下的触发参数:
weeks:周。整形。
days:一个月中的第几天。整形。
hours:小时。整形。
minutes:分钟。整形。
seconds:秒。整形。
start_date:间隔触发的起始时间。
end_date:间隔触发的结束时间。
jitter:触发的时间误差。
在每天的11点24分触发事件。更多请参考:
https://apscheduler.readthedocs.io/en/stable/modules/triggers/interval.html
3. crontab触发器:
在某个确切的时间周期性的触发事件。可以使用的参数如下:
year:4位数字的年份。
month:1-12月份。
day:1-31日。
week:1-53周。
day_of_week:一个礼拜中的第几天( 0-6或者 mon、 tue、 wed、 thu、 fri、 sat、 sun)。
hour: 0-23小时。
minute: 0-59分钟。
second: 0-59秒。
start_date: datetime类型或者字符串类型,起始时间。
end_date: datetime类型或者字符串类型,结束时间。
timezone:时区。
jitter:任务触发的误差时间。
也可以用表达式类型,可以用以下方式:
示例如下:
五、调度器:
BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用 start函数会阻塞当前线程,不能立即返回。
BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用 start后主线程不会阻塞。
AsyncIOScheduler:适用于使用了 asyncio模块的应用程序。
GeventScheduler:适用于使用 gevent模块的应用程序。
TwistedScheduler:适用于构建 Twisted的应用程序。
QtScheduler:适用于构建 Qt的应用程序。
六、任务存储器:
任务存储器的选择有两种。一是内存,也是默认的配置。二是数据库。使用内存的方式是简单高效,但是不好的是,一旦程序出现问题,重新运行的话,会把之前已经执行了的任务重新执行一遍。数据库则可以在程序崩溃后,重新运行可以从之前中断的地方恢复正常运行。有以下几种选择:
MemoryJobStore:没有序列化,任务存储在内存中,增删改查都是在内存中完成。
SQLAlchemyJobStore:使用 SQLAlchemy这个 ORM框架作为存储方式。
MongoDBJobStore:使用 mongodb作为存储器。
RedisJobStore:使用 redis作为存储器。
七、执行器:
执行器的选择取决于应用场景。通常默认的 ThreadPoolExecutor已经在大部分情况下是可以满足我们需求的。如果我们的任务涉及到一些 CPU密集计算的操作。那么应该考虑 ProcessPoolExecutor。然后针对每种程序, apscheduler也设置了不同的 executor:
ThreadPoolExecutor:线程池执行器。
ProcessPoolExecutor:进程池执行器。
GeventExecutor: Gevent程序执行器。
TornadoExecutor: Tornado程序执行器。
TwistedExecutor: Twisted程序执行器。
AsyncIOExecutor: asyncio程序执行器。
八、定时任务调度配置:
这里我们用一个例子来说明。比如我想这样配置
执行器:
配置 default执行器为 ThreadPoolExecutor,并且设置最多的线程数是20个。
<
存储器:
配置 default的任务存储器为 SQLAlchemyJobStore(使用SQLite)。
<
任务配置:
设置 coalesce为 False:设置这个目的是,比如由于某个原因导致某个任务积攒了很多次没有执行(比如有一个任务是1分钟跑一次,但是系统原因断了5分钟),如果 coalesce=True,那么下次恢复运行的时候,会只执行一次,而如果设置 coalesce=False,那么就不会合并,会5次全部执行。
max_instances=5:同一个任务同一时间最多只能有5个实例在运行。比如一个耗时10分钟的job,被指定每分钟运行1次,如果我 max_instance值5,那么在第6~10分钟上,新的运行实例不会被执行,因为已经有5个实例在跑了。
那么代码如下:
九、任务操作:
1. 添加任务:
使用 scheduler.add_job(job_obj,args,id,trigger,**trigger_kwargs)。
2. 删除任务:
使用 scheduler.remove_job(job_id,jobstore=None)。
3. 暂停任务:
使用 scheduler.pause_job(job_id,jobstore=None)。
4. 恢复任务:
使用 scheduler.resume_job(job_id,jobstore=None)。
5. 修改某个任务属性信息:
使用 scheduler.modify_job(job_id,jobstore=None,**changes)。
6. 修改单个作业的触发器并更新下次运行时间:
使用 scheduler.reschedule_job(job_id,jobstore=None,trigger=None,**trigger_args)
7. 输出作业信息:
使用 scheduler.print_jobs(jobstore=None,out=sys.stdout)
十、异常监听:
当我们的任务抛出异常后,我们可以监听到,然后把错误信息进行记录。示例代码如下:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import logging
# 配置日志显示
以上便是 APScheduler库的详细用法了。如果我们需要在项目中开一个定时功能,完全可以选择 APScheduler,轻量又功能强大。
这次女神再也不用2点跑到公司去加班啦~
相关推荐
- 手机上怎么找qq邮箱登录(用手机怎么找到qq邮箱)
-
入口是“联系人”选项卡。qq邮箱手机在QQ主菜单中选择下方的“联系人”选项卡;3、在“联系人”中选取“公众号”选项卡;4、在公众号中菜单中找到或搜索“QQ邮箱提醒”,点击进入;5、点击“进入邮箱”;6...
- amd显卡控制面板
-
AMD显卡控制面板是用来管理你的AMD显卡的,可以在控制面板中进行设置一些简单的调整,来提升显卡性能和效果。1、先打开AMD控制面板。2、打开“垂直同步(V-SYNC)”功能,可调整细节,改善影像流畅...
- win10老是未响应卡死(window10总是未响应)
-
具体方法:1、如果win10中的应用程序出现不响应的情况,应该是应用程序加载失败了。可以通过重置方法来解决win10应用程序无响应。2、登录win10系统,用管理员身份运行Powershell(可在C...
- usb安装系统步骤(USB安装系统步骤)
-
1.准备一张U盘,将联想官网下载的系统镜像文件复制到U盘中;2.将U盘插入联想S41U电脑,重启电脑,按F12进入BIOS设置,将U盘设置为启动项;3.重启电脑,进入U盘安装界面,按提示操作,完成系统...
- win98安装教程(win98iso怎么安装)
-
如何安装windows98 一、具体安装步骤 备份好重要文件之后,就可以安装windows98了。 第一步:启动安装程序。 用户如果原来已安装了windows95/97/98,现在拟对其进行升...
- 雨林木风win7安装(雨林木风win732位安装教程)
-
安装步骤如下: 1、光盘放入光驱,复制光盘上的win7.gho和安装系统.exe到硬盘非C盘的文件夹;(gho文件名可以是其他名字,后缀为gho,体积最大的就是。) 2、双击安装系统.exe;...
- win10解绑管理员账户(win10管理员账户怎么取消开机密码)
-
要解除Windows10电脑上的管理员权限,您需要进行以下操作:1.打开“控制面板”:右键单击“开始”按钮,然后选择“控制面板”。2.进入“用户账户”:在控制面板中,选择“用户账户”。3.点击...
- win10家庭版没有组策略编辑器
-
Win10组策略编辑器找不到怎么办 解决方法 一、win10系统版本本身不提供组策略的功能。 1、运行gpedit.msc直接提示找到gpedit.msc(组策略)。 2、运行MMC,在“添加...
- tplogin管理员登录入口(tplogin重新设置密码)
-
tplogin.cn是新版tplink路由器的登录地址(管理页面地址),在浏览器中输入tplogin.cn,就可以打开tplink路由器的管理页面(登录页面)。具体的登录方法如下:1、打开电脑上的浏...
- psp模拟器怎么导入游戏(psp模拟器怎么导入游戏 Vivo手机)
-
方法如下:1、打开能操作文件的助手软件,用pp链接后点击左下文件,然后点常用目录下的程序用户,会出现ppsspp的文件夹。2、打开ppsspp文件夹,会出来四个选项文件夹,第一个进去后是psp文件夹,...
- 电脑系统怎样升级(电脑系统怎么升级)
-
电脑系统升级方法步骤,1、打开电脑,点击电脑左下角的开始菜单,在弹出的菜单选项中选择“控制面板”。2、点击“开始”,点击“控制面板”3、在控制面板中,点击“系统和安全”。4、点击启用或禁用自动更新。5...
- windows无法激活(windows无法激活有什么影响)
-
1.如果修复或重新组装了电脑,则可能是安装了不同版本的Windows。或者,如果在修复过程中为电脑使用了其他产品密钥,当使用该密钥的电脑数大于Microsoft软件许可条款允许的电脑数时,该密钥...
-
- u盘文件恢复软件免费(恢复u盘数据免费的软件)
-
u盘损坏文件恢复方法:1、打开电脑桌面的“计算机”或“我的电脑”。2、然后再找到需要修复的u盘。3、打开“运行”窗口(可以直接按“Windows+R”快捷打开),输入“CMD”并点击“确定”按钮以进入命令提符界面。4、从打开的“命令提示符”...
-
2025-12-28 22:03 off999
欢迎 你 发表评论:
- 一周热门
-
-
抖音上好看的小姐姐,Python给你都下载了
-
全网最简单易懂!495页Python漫画教程,高清PDF版免费下载
-
Python 3.14 的 UUIDv6/v7/v8 上新,别再用 uuid4 () 啦!
-
飞牛NAS部署TVGate Docker项目,实现内网一键转发、代理、jx
-
python入门到脱坑 输入与输出—str()函数
-
宝塔面板如何添加免费waf防火墙?(宝塔面板开启https)
-
Python三目运算基础与进阶_python三目运算符判断三个变量
-
(新版)Python 分布式爬虫与 JS 逆向进阶实战吾爱分享
-
失业程序员复习python笔记——条件与循环
-
系统u盘安装(win11系统u盘安装)
-
- 最近发表
- 标签列表
-
- python计时 (73)
- python安装路径 (56)
- python类型转换 (93)
- python进度条 (67)
- python吧 (67)
- python的for循环 (65)
- python格式化字符串 (61)
- python静态方法 (57)
- python列表切片 (59)
- python面向对象编程 (60)
- python 代码加密 (65)
- python串口编程 (77)
- python封装 (57)
- python写入txt (66)
- python读取文件夹下所有文件 (59)
- python操作mysql数据库 (66)
- python获取列表的长度 (64)
- python接口 (63)
- python调用函数 (57)
- python多态 (60)
- python匿名函数 (59)
- python打印九九乘法表 (65)
- python赋值 (62)
- python异常 (69)
- python元祖 (57)
