django-celery-beat搭建定時任務(wù)的實現(xiàn)
一、創(chuàng)建django項目和app
1、安裝定時任務(wù)第三方包
pip install django-celery-beat # 插件用來動態(tài)配置定時任務(wù),一般會配合 django_celery_results 一起使用,所以一起安裝 django_celery_results
pip install django_celery_results pip install eventlet # windows下運行celery 4以后版本,還需額外安裝eventlet庫
2、創(chuàng)建django項目并創(chuàng)建一個使用定時任務(wù)的app
1.1創(chuàng)建django項目并創(chuàng)建app
創(chuàng)建的過程省略,不在這里展開,需要注意的是setting文件注冊app的配置如下:
INSTALLED_APPS = [
......
'myapp', # 剛創(chuàng)建的使用定時任務(wù)的app
'django_celery_beat', # 插件用來動態(tài)配置定時任務(wù),只要進行了第一步pip安裝就可以直接注冊了
'django_celery_results',
]
1.2 創(chuàng)建定時任務(wù)數(shù)據(jù)庫
依次執(zhí)行: python manage.py makemigrations 和 python manage.py migrate
打開數(shù)據(jù)庫發(fā)現(xiàn),自動創(chuàng)建了一些表如下:

這些都是定時任務(wù)需要的表格,自動創(chuàng)建不需要手動管理
django_celery_beat.models.ClockedSchedule # 特定時刻任務(wù) django_celery_beat.models.CrontabSchedule # 特定時間表任務(wù),例如每周1運行的計劃 django_celery_beat.models.IntervalSchedule # 以特定間隔(例如,每5秒)運行的計劃。 django_celery_beat.models.PeriodicTask # 此模型定義要運行的單個周期性任務(wù)。 django_celery_beat.models.PeriodicTasks # 此模型僅用作索引以跟蹤計劃何時更改 django_celery_beat.models.SolarSchedule # 定制任務(wù)
如果安裝注冊了django_celery_results 還會有另外三個表:

2、新建一個celery.py文件
文件的作用是指定django環(huán)境、創(chuàng)建Celery app和指定Celery配置文件的啟動位置,類似與wsgi.py或asgi.py,因此也建議文件創(chuàng)建位置與wsgi.py或asgi.py同級。
文件內(nèi)容如下:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms
# 設(shè)置django環(huán)境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lc_manage.settings.settings')
# 創(chuàng)建一個Celery app
app = Celery('djangotask')
platforms.C_FORCE_ROOT = True ? # 如果配置沒有生效需要在啟動時設(shè)置 ?export C_FORCE_ROOT="true"
# ?使用CELERY_ 作為前綴,在celeryconfig.py中寫配置
app.config_from_object('lc_manage.celeryconfig', namespace="CELERY")
# app.config_from_object('lc_manage.celeryconfig')
# 發(fā)現(xiàn)任務(wù)文件每個app下的tasks.py
app.autodiscover_tasks()3、創(chuàng)建配置文件config.py
這個文件里的內(nèi)容可以寫到項目的 settings.py里面,因為上面的celery.py 中可以指定配置文件位置;不過內(nèi)容比較多,還是建議單獨創(chuàng)建一個配置文件celeryconfig.py,可以與celery.py 同級目錄,文件內(nèi)容如下:
from __future__ import absolute_import # broker 設(shè)置 指定中間代理人將任務(wù)存到哪里,這里是redis的11號庫 CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/11' # 指定 Backend 儲存結(jié)果的地方,可以使用django數(shù)據(jù)庫(django-db),也可以使用redis, # 使用django數(shù)據(jù)庫(django-db),以后運行worker就會保存到數(shù)據(jù)庫中,可以通過ORM進行訪問 # CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND = 'django-db' # 使用django_celery_beat插件用來動態(tài)配置任務(wù) CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # 指定時區(qū),默認是 UTC CELERY_TIMEZONE = 'Asia/Shanghai' # celery 序列化與反序列化配置 CELERY_TASK_SERIALIZER = 'pickle' CELERY_RESULT_SERIALIZER = 'pickle' CELERY_ACCEPT_CONTENT = ['pickle', 'json'] CELERY_TASK_IGNORE_RESULT = True # 有些情況下可以防止死鎖 ?非常重要! CELERYD_FORCE_EXECV = True # 為存儲結(jié)果設(shè)置過期日期,默認1天過期。如果beat開啟,Celery每天會自動清除。 # 設(shè)為0,存儲結(jié)果永不過期 # CELERY_RESULT_EXPIRES = xx # CELERY_TASK_RESULT_EXPIRES = 60*60*24 ?# 后端存儲的任務(wù)超過一天時,自動刪除數(shù)據(jù)庫中的任務(wù)數(shù)據(jù),單位秒 CELERY_MAX_TASKS_PER_CHILD = 1000 ?# 每個worker執(zhí)行1000次任務(wù)后,自動重啟worker,防止任務(wù)占用太多內(nèi)存導致內(nèi)存泄漏 # 禁用所有速度限制,如果網(wǎng)絡(luò)資源有限,不建議開足馬力。 CELERY_DISABLE_RATE_LIMITS = True # celery beat配置(周期性任務(wù)設(shè)置) CELERY_ENABLE_UTC = False # 官方用來修復CELERY_ENABLE_UTC=False and USE_TZ = False 時時間比較錯誤的問題; # 詳情見:https://github.com/celery/django-celery-beat/pull/216/files DJANGO_CELERY_BEAT_TZ_AWARE = False
這里需要注意的是,如果在celery.py中配置指定了confiig配置文件使用CELERY前綴:app.config_from_object(‘lc_manage.celeryconfig’, namespace=“CELERY”),那么celeryconfig.py配置文件的參數(shù)都應(yīng)加:CELERY_,當然你也可以不用第二個參數(shù),namespace=“CELERY"寫成app.config_from_object(‘lc_manage.celeryconfig’”), 那么celeryconfig.py中就不需要加CELERY_ 前綴,注意一定要統(tǒng)一?。。》駝t可能 會報錯:
consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
4、加載celery.py
我們自己創(chuàng)建的celery.py雖然與wsgi.py 或者 asgi.py等同級,但是 不會像他們一樣自動加載,需要我們通過本級文件下的__init__.py 把celery.py 加載進來,打開__init__.py文件,添加如下內(nèi)容:
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
# 使得django啟動時加載celery的app
__all__ = ('celery_app',)
5、創(chuàng)建定時任務(wù)執(zhí)行內(nèi)容
經(jīng)過上面的配置,django-celery_beta 會自動去掃描每個app目錄下是否有 tasks.py 文件,需要創(chuàng)建定時任務(wù)的app下我們可以手動創(chuàng)建tasks.py,定時任務(wù)就寫在這個文件上:
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
? ? print("x + y ?= ?", x + y)
? ? return x + y
@shared_task
def mul(x, y):
? ? print("x * y ?= ?", x * y)
? ? return x * y二、定時器創(chuàng)建和定時任務(wù)添加
1、時間和周期控制:IntervalSchedule、ClockedSchedule和CrontabSchedule
其他帖子都把上面三個稱為定時任務(wù)與PeriodicTask放一起結(jié)束,但是個人理解以上三個都是定時任務(wù)時控制時間和周期執(zhí)行的控制器,并非創(chuàng)建定時任務(wù),真正創(chuàng)建定時任務(wù)的只有PeriodicTask,所以這里個人把這三個稱為定時任務(wù)的“時間和頻率控制器”。
- IntervalSchedule 按時間間隔頻率執(zhí)行定時任務(wù)的控制器,(例:每間隔1H/1M/…執(zhí)行一次)
- ClockedSchedule 指定某個時刻執(zhí)行定時任務(wù)的控制器, (例:2018年8月8號 8:00這個時刻執(zhí)行)
- CrontabSchedule 指定某個時間執(zhí)行定時任務(wù)的控制器 (例:每年的12月星期一的8:30)
導入這四個模塊:
from django_celery_beat.models import CrontabSchedule, PeriodicTask, IntervalSchedule,ClockedSchedule
1.1 IntervalSchedule 時間間隔控制器
參數(shù):every 間隔數(shù),period 間隔單位
schedule, created = IntervalSchedule.objects.update_or_create(
every=1,
period=IntervalSchedule.MINUTES) # 按分鐘間隔執(zhí)行
第二個參數(shù)可選
- IntervalSchedule.DAYS 固定間隔天數(shù)
- IntervalSchedule.HOURS 固定間隔小時數(shù)
- IntervalSchedule.MINUTES 固定間隔分鐘數(shù)
- IntervalSchedule.SECONDS 固定間隔秒數(shù)
- IntervalSchedule.MICROSECONDS 固定間隔微秒
返回值可直接解包,其實只有第一個參數(shù)schedule有用,在PeriodicTask創(chuàng)建定時任務(wù)時作為時間和周期控制參數(shù)傳入。解包獲得的第二個數(shù)據(jù)created 可以直接用下劃線取代
1.2 ClockedSchedule特定時刻定時器
參數(shù):clocked_time 指定時間
clocked, _ = ClockedSchedule.objects.update_or_create(
clocked_time =datetime.strptime("2020-08-18 16:58:46","%Y-%m-%d %H:%M:%S")) # 按指定時間執(zhí)行
這里第二個參數(shù)直接用下劃線取代。特定時刻控制一般時執(zhí)行一次,適合在view中調(diào)用執(zhí)行一次性計劃。
1.3、周期性任務(wù) CrontabSchedule
參數(shù):
- month_of_year # 幾月執(zhí)行
- day_of_month # 幾號執(zhí)行
- day_of_week # 周幾執(zhí)行
- hour # 幾點執(zhí)行
- minute # 幾分執(zhí)行
- timezone # 時區(qū)
crontab, _ = CrontabSchedule.objects.update_or_create(
minute="00",
hour="23",
day_of_week="*", # 周幾執(zhí)行
day_of_month='1', # 幾點執(zhí)行
month_of_year='*',
timezone=pytz.timezone("Asia/Shanghai"),
)
上面的星號代表不使用,上面的配置即: 每月1日的23點執(zhí)行一次。如果day_of_month=“*”則代表每天23點執(zhí)行。
2、動態(tài)添加任務(wù) PeriodicTask
參數(shù):
- name:任務(wù)名
- task:指定的任務(wù)
- interval:時間間隔
- crontab:時間控制器
- clocked:指定時刻控制器
- expires:有效日期
- one_off:啟用狀態(tài)(如果為True,調(diào)度將只運行該任務(wù)一次)
- start_time:開始時間
- enabled:啟用
- last_run_at:最后運行時間
- total_run_count:運行總次數(shù)
- date_changed:最后的更改時間
- description:描述
- args: 傳參
注意:
1、interval、crontab、clocked三選一,不可同時使用。參數(shù)值分別對應(yīng)interval:schedule(IntervalSchedule的第一個返回值);crontab:crontab(CrontabSchedule的第一個返回值);clocked:clocked(ClockedSchedule返回值)
2、name :任務(wù)名,確保其唯一性?。。?!
3、task:后面是以字符串形式調(diào)用定時任務(wù)的具體工作內(nèi)容函數(shù),即第一項的第5條用@shared_task裝飾器創(chuàng)建的方法。
4、enabled:是否啟用,這里動態(tài)創(chuàng)建的話一般設(shè)為true
5、args: 傳參(task中指定的任務(wù)需要傳參時使用),注意需要json序列化 json.dumps(…)
其他參數(shù)均為非必填項。
PeriodicTask.objects.update_or_create(
name=working_point_record_id + 'working_time_1',
defaults={
"task": "apps.tasks.add",
"crontab": crontab,
"enabled": True,
"args": json.dumps([working_point_record_id])
},
4、封裝方法
一般情況下,可以把IntervalSchedule、ClockedSchedule和CrontabSchedule根據(jù)業(yè)務(wù)需求單獨封裝一個文件,而PeriodicTask.objects直接在對應(yīng)view視圖中調(diào)用即可。
三、啟動定時任務(wù)beat
定時任務(wù)是獨立與django項目運行的,django只是定時任務(wù)的入口和操作數(shù)據(jù)庫的入口,而這前提是django-celery-beat 已經(jīng)獨立啟動,django-celery-beat的啟動分兩步,一是生產(chǎn)者單獨啟動(beat),而是工作者單獨啟動(worker),這里啟動順序需要注意一點,建議先啟動worker 再啟動beat ,曾經(jīng)遇到先啟動beat有可能beat會啟動失敗。
1、啟動工作者worker
此時仍然需要重新打開一個命令窗口,進入django項目的根目錄(manage.py同級目錄)下:
# Linux下測試,啟動Celery Celery -A 【項目名稱】worker -l info ? # Windows下測試,啟動Celery Celery -A 【項目名稱】worker -l info -P eventlet ? # 如果Windows下Celery不工作,輸入如下命令 Celery -A 【項目名稱】worker -l info --pool=solo
2、啟動生產(chǎn)者beat
beat 是一個生產(chǎn)者角色,是單獨運行,生產(chǎn)者完全不依賴django,需要與django在一個不同的命令窗口運行,但啟動時需要先進入django項目的根目錄,即與manage.py在同一目錄下:
celery -A 【項目名稱】 beat -l info
四、定時任務(wù)創(chuàng)建
這里要劃重點了,其實看完上面的已經(jīng)可以使用了,下面的屬于優(yōu)化選擇性理解,看個人理解能力和需求:
一般情況下,定時器創(chuàng)建和定時任務(wù)添加 PeriodicTask會在view視圖中創(chuàng)建,但是如果view視圖比較頻繁,那么每次執(zhí)行view都創(chuàng)建一個定時任務(wù)的話會有無數(shù)個,會比較占用內(nèi)存資源,當然可以在celeryconfig.py設(shè)置執(zhí)行多少次后重啟任務(wù),高并發(fā)下view會不會導致頻繁重啟,這里需要根據(jù)業(yè)務(wù)邏輯把@shared_task包裹下的任務(wù)處理方法做成批量處理放到單獨文件中(不需要在view視圖中調(diào)用,定時調(diào)用批處理,那么就是一個任務(wù)批量處理數(shù)據(jù)而已),在間隔固定時間下執(zhí)行,單純語言難以表達清楚,理解的就理解了,不理解的就慢慢體會吧。這里需要畫重點的是:如果你理解了單個文件寫@shared_task批處理方法,那么你的問題重點就是:我如何啟動它呢?難道每次在啟動beat后在命令行啟動嗎?雖然也可以,但是如果項目需要創(chuàng)建的定時任務(wù)很多怎么辦,不用多,幾百行可以了,每次在命令行復制粘貼執(zhí)行命令 嗎?當然不要,其實直接創(chuàng)建一個單獨的python文件,然后再根目錄下的url中導入即可,因為url在項目啟動的時候會自動加載一次,也就相當于啟動django項目的時候就自動創(chuàng)建了一次定時任務(wù)!
到此這篇關(guān)于django-celery-beat搭建定時任務(wù)的實現(xiàn)的文章就介紹到這了,更多相關(guān)django-celery-beat 定時任務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python實現(xiàn)Appium端口檢測與釋放的實現(xiàn)
這篇文章主要介紹了Python實現(xiàn)Appium端口檢測與釋放的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-12-12
Python報錯:OSError:?[Errno?22]?Invalid?argument解決方案及應(yīng)用實例
最近跑別人的項目遇到一個這樣的問題一開始以為是沒有用管理員的權(quán)限運行,導致創(chuàng)建不了日志文件后來發(fā)現(xiàn)是和windows的命名規(guī)則沖突了,這篇文章主要給大家介紹了關(guān)于Python報錯:OSError:?[Errno?22]?Invalid?argument的解決方案及應(yīng)用實例,需要的朋友可以參考下2024-07-07
Python中的JSON?Pickle?Shelve模塊特性與區(qū)別實例探究
在Python中,處理數(shù)據(jù)序列化和持久化是極其重要的,JSON、Pickle和Shelve是三種常用的模塊,它們提供了不同的方法來處理數(shù)據(jù)的序列化和持久化,本文將深入研究這三個模塊,探討它們的特性、用法以及各自的優(yōu)缺點2024-01-01
Python數(shù)據(jù)清洗之抽取jsonl文件數(shù)據(jù)字段并合并
這篇文章主要為大家詳細介紹了Python數(shù)據(jù)清洗之抽取jsonl文件數(shù)據(jù)字段并合并的相關(guān)知識,文中的示例代碼講解詳細,感興趣的小伙伴可以了解下2025-03-03
transforms.Compose()函數(shù)的使用及說明
這篇文章主要介紹了transforms.Compose()函數(shù)的使用及說明,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08

