Python使用APScheduler實(shí)現(xiàn)定時任務(wù)過程解析
前言
APScheduler是基于Quartz的一個Python定時任務(wù)框架。提供了基于日期、固定時間間隔以及crontab類型的任務(wù),并且可以持久化任務(wù)。
在線文檔:https://apscheduler.readthedocs.io/en/latest/userguide.html
一、安裝APScheduler
pip install apscheduler
二、基本概念
APScheduler有四大組件:
1、觸發(fā)器 triggers :
觸發(fā)器包含調(diào)度邏輯。每個作業(yè)都有自己的觸發(fā)器,用于確定下一個任務(wù)何時運(yùn)行。除了初始配置之外,觸發(fā)器是完全無狀態(tài)的。
有三種內(nèi)建的trigger:
- (1)date: 特定的時間點(diǎn)觸發(fā)
- (2)interval: 固定時間間隔觸發(fā)
- (3)cron: 在特定時間周期性地觸發(fā)
2、任務(wù)儲存器 job stores:用于存放任務(wù),把任務(wù)存放在內(nèi)存(為默認(rèn)MemoryJobStore)或數(shù)據(jù)庫中。
3、執(zhí)行器 executors: 執(zhí)行器是將任務(wù)提交到線程池或進(jìn)程池中運(yùn)行,當(dāng)任務(wù)完成時,執(zhí)行器通知調(diào)度器觸發(fā)相應(yīng)的事件。
4、調(diào)度器 schedulers: 把上方三個組件作為參數(shù),通過創(chuàng)建調(diào)度器實(shí)例來運(yùn)行
根據(jù)開發(fā)需求選擇相應(yīng)的組件,下面是不同的調(diào)度器組件:
- BlockingScheduler 阻塞式調(diào)度器:適用于只跑調(diào)度器的程序。
- BackgroundScheduler 后臺調(diào)度器:適用于非阻塞的情況,調(diào)度器會在后臺獨(dú)立運(yùn)行。
- AsyncIOScheduler AsyncIO調(diào)度器,適用于應(yīng)用使用AsnycIO的情況。
- GeventScheduler Gevent調(diào)度器,適用于應(yīng)用通過Gevent的情況。
- TornadoScheduler Tornado調(diào)度器,適用于構(gòu)建Tornado應(yīng)用。
- TwistedScheduler Twisted調(diào)度器,適用于構(gòu)建Twisted應(yīng)用。
- QtScheduler Qt調(diào)度器,適用于構(gòu)建Qt應(yīng)用。
三、使用步驟
1、新建一個調(diào)度器schedulers
2、添加調(diào)度任務(wù)
3、運(yùn)行調(diào)度任務(wù)
四、使用實(shí)例
1、觸發(fā)器date
特定的時間點(diǎn)觸發(fā),只執(zhí)行一次。參數(shù)如下:
| 參數(shù) | 說明 |
| run_date (datetime 或 str) | 作業(yè)的運(yùn)行日期或時間 |
| timezone (datetime.tzinfo 或 str) | 指定時區(qū) |
使用例子:
from datetime import datetime from datetime import date from apscheduler.schedulers.blocking import BlockingScheduler def job(text): print(text) scheduler = BlockingScheduler() # 在 2019-8-30 運(yùn)行一次 job 方法 scheduler.add_job(job, 'date', run_date=date(2019, 8, 30), args=['text1']) # 在 2019-8-30 01:00:00 運(yùn)行一次 job 方法 scheduler.add_job(job, 'date', run_date=datetime(2019, 8, 30, 1, 0, 0), args=['text2']) # 在 2019-8-30 01:00:01 運(yùn)行一次 job 方法 scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=['text3']) scheduler.start()
2、觸發(fā)器interval
固定時間間隔觸發(fā)。參數(shù)如下:
| 參數(shù) | 說明 |
| weeks (int) | 間隔幾周 |
| days (int) | 間隔幾天 |
| hours (int) | 間隔幾小時 |
| minutes (int) | 間隔幾分鐘 |
| seconds (int) | 間隔多少秒 |
| start_date (datetime 或 str) | 開始日期 |
| end_date (datetime 或 str) | 結(jié)束日期 |
| timezone (datetime.tzinfo 或str) |
使用例子:
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('{} --- {}'.format(text, t))
scheduler = BlockingScheduler()
# 每隔 1分鐘 運(yùn)行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, args=['job1'])
# 在 2019-08-29 22:15:00至2019-08-29 22:17:00期間,每隔1分30秒 運(yùn)行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, seconds = 30, start_date='2019-08-29 22:15:00', end_date='2019-08-29 22:17:00', args=['job2'])
scheduler.start()
'''
運(yùn)行結(jié)果:
job2 --- 2019-08-29 22:15:00
job1 --- 2019-08-29 22:15:46
job2 --- 2019-08-29 22:16:30
job1 --- 2019-08-29 22:16:46
job1 --- 2019-08-29 22:17:46
...余下省略...
'''
3、觸發(fā)器cron
在特定時間周期性地觸發(fā)。參數(shù)如下:
| 參數(shù) | 說明 |
| year (int 或 str) | 年,4位數(shù)字 |
| month (int 或 str) | 月 (范圍1-12) |
| day (int 或 str) | 日 (范圍1-31) |
| week (int 或 str) | 周 (范圍1-53) |
| day_of_week (int 或 str) | 周內(nèi)第幾天或者星期幾 (范圍0-6 或者 mon,tue,wed,thu,fri,sat,sun) |
| hour (int 或 str) | 時 (范圍0-23) |
| minute (int 或 str) | 分 (范圍0-59) |
| second (int 或 str) | 秒 (范圍0-59) |
| start_date (datetime 或 str) | 最早開始日期(包含) |
| end_date (datetime 或 str) | 最晚結(jié)束時間(包含) |
| timezone (datetime.tzinfo 或str) | 指定時區(qū) |
這些參數(shù)支持算數(shù)表達(dá)式,取值格式有如下:

使用例子:
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('{} --- {}'.format(text, t))
scheduler = BlockingScheduler()
# 在每天22點(diǎn),每隔 1分鐘 運(yùn)行一次 job 方法
scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1'])
# 在每天22和23點(diǎn)的25分,運(yùn)行一次 job 方法
scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2'])
scheduler.start()
'''
運(yùn)行結(jié)果:
job1 --- 2019-08-29 22:25:00
job2 --- 2019-08-29 22:25:00
job1 --- 2019-08-29 22:26:00
job1 --- 2019-08-29 22:27:00
...余下省略...
'''
4、通過裝飾器scheduled_job()添加方法
添加任務(wù)的方法有兩種:
(1)通過調(diào)用add_job()---見上面1至3代碼
(2)通過裝飾器scheduled_job():
第一種方法是最常用的方法。第二種方法主要是方便地聲明在應(yīng)用程序運(yùn)行時不會更改的任務(wù)。該 add_job()方法返回一個apscheduler.job.Job實(shí)例,可以使用該實(shí)例稍后修改或刪除該任務(wù)。
import time
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('interval', seconds=5)
def job1():
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('job1 --- {}'.format(t))
@scheduler.scheduled_job('cron', second='*/7')
def job2():
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('job2 --- {}'.format(t))
scheduler.start()
'''
運(yùn)行結(jié)果:
job2 --- 2019-08-29 22:36:35
job1 --- 2019-08-29 22:36:37
job2 --- 2019-08-29 22:36:42
job1 --- 2019-08-29 22:36:42
job1 --- 2019-08-29 22:36:47
job2 --- 2019-08-29 22:36:49
...余下省略...
'''
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
python2.7讀取文件夾下所有文件名稱及內(nèi)容的方法
python,本身來說是一門高級編程語言,python它入門簡單,有基礎(chǔ)的學(xué)起來很快就能有簡單的應(yīng)用,但是在非常高的抽象計(jì)算中,高級的python程序設(shè)計(jì)也是非常難學(xué)的。接下來給大家介紹python2.7讀取文件夾下所有文件名稱及內(nèi)容的方法,一起看看吧2018-02-02
Python函數(shù)參數(shù)類型及排序原理總結(jié)
這篇文章主要介紹了Python函數(shù)參數(shù)類型及排序原理總結(jié),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12
Python實(shí)現(xiàn)快速將pdf文件剪切成多個圖片
這篇文章主要為大家詳細(xì)介紹了如何使用Python實(shí)現(xiàn)快速將pdf文件剪切成多個圖片,文中的示例代碼講解詳細(xì),有需要的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-01-01
Python實(shí)現(xiàn)線性擬合及繪圖的示例代碼
在數(shù)據(jù)處理和繪圖中,我們通常會遇到直線或曲線的擬合問題,本文主要介紹了Python實(shí)現(xiàn)線性擬合及繪圖的示例代碼,具有一定的參考價值,感興趣的可以了解一下2024-04-04
Python將list保存到文件的3種方法實(shí)例代碼
這篇文章主要給大家介紹了關(guān)于Python將list保存到文件的3種方法,Python中提供了文件操作的功能,可以通過打開和讀寫文件實(shí)現(xiàn),文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-02-02
python實(shí)現(xiàn)簡單倒計(jì)時功能
這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)簡單倒計(jì)時功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-04-04

