Django中使用Celery的方法示例
起步
在 《分布式任務(wù)隊(duì)列Celery使用說明》 中介紹了在 Python 中使用 Celery 來實(shí)驗(yàn)異步任務(wù)和定時(shí)任務(wù)功能。本文介紹如何在 Django 中使用 Celery。
安裝
pip install django-celery
這個(gè)命令使用的依賴是 Celery 3.x 的版本,所以會(huì)把我之前安裝的 4.x 卸載,不過對(duì)功能上并沒有什么影響。我們也完全可以僅用Celery在django中使用,但使用 django-celery 模塊能更好的管理 celery。
使用
可以把有關(guān) Celery 的配置放到 settings.py 里去,但我比較習(xí)慣單獨(dú)一個(gè)文件來放,然后在 settings.py 引入進(jìn)來:
# celery_config.py
import djcelery
import os
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
# UTC
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_IMPORTS = (
'app.tasks',
)
# 有些情況可以防止死鎖
CELERY_FORCE_EXECV = True
# 設(shè)置并發(fā)的worker數(shù)量
CELERYD_CONCURRENCY = 4
# 任務(wù)發(fā)送完成是否需要確認(rèn),這一項(xiàng)對(duì)性能有一點(diǎn)影響
CELERY_ACKS_LATE = True
# 每個(gè)worker執(zhí)行了多少任務(wù)就會(huì)銷毀,防止內(nèi)存泄露,默認(rèn)是無限的
CELERYD_MAX_TASKS_PER_CHILD = 40
# 規(guī)定完成任務(wù)的時(shí)間
CELERYD_TASK_TIME_LIMIT = 15 * 60 # 在15分鐘內(nèi)完成任務(wù),否則執(zhí)行該任務(wù)的worker將被殺死,任務(wù)移交給父進(jìn)程
# 設(shè)置默認(rèn)的隊(duì)列名稱,如果一個(gè)消息不符合其他的隊(duì)列就會(huì)放在默認(rèn)隊(duì)列里面,如果什么都不設(shè)置的話,數(shù)據(jù)都會(huì)發(fā)送到默認(rèn)的隊(duì)列中
CELERY_DEFAULT_QUEUE = "default"
# 設(shè)置詳細(xì)的隊(duì)列
CELERY_QUEUES = {
"default": { # 這是上面指定的默認(rèn)隊(duì)列
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default"
},
"beat_queue": {
"exchange": "beat_queue",
"exchange_type": "direct",
"routing_key": "beat_queue"
}
}
配置文件中設(shè)置了 CELERY_IMPORTS 導(dǎo)入的任務(wù),所以在django app中創(chuàng)建相應(yīng)的任務(wù)文件:
# app/tasks.py
from celery.task import Task
import time
class TestTask(Task):
name = 'test-task' # 給任務(wù)設(shè)置個(gè)自定義名稱
def run(self, *args, **kwargs):
print('start test task')
time.sleep(4)
print('args={}, kwargs={}'.format(args, kwargs))
print('end test task')
在 settings.py 添加:
INSTALLED_APPS = [ # ... 'djcelery', ] # Celery from learn_django.celery_config import *
觸發(fā)任務(wù)或提交任務(wù)可以在view中來調(diào)用:
# views.py
from django.http import HttpResponse
from app.tasks import TestTask
def test_task(request):
# 執(zhí)行異步任務(wù)
print('start do request')
t = TestTask()
t.delay()
print('end do request')
return HttpResponse('ok')
啟動(dòng) woker 的命令是:
python manage.py celery worker -l info
再啟動(dòng)django,訪問該view,可以看到任務(wù)在worker中被消費(fèi)了。
定時(shí)任務(wù)
在celery的配置文件 celery_config.py 文件中添加:
CELERYBEAT_SCHEDULE = {
'task1-every-1-min': { # 自定義名稱
'task': 'test-task', # 與任務(wù)中name名稱一致
'schedule': datetime.timedelta(seconds=5),
'args': (2, 15),
'options': {
'queue': 'beat_queue', # 指定要使用的隊(duì)列
}
},
}
通過 options 的 queque 來指定要使用的隊(duì)列,這里需要單獨(dú)的隊(duì)列是因?yàn)?,如果所有任?wù)都使用同一隊(duì)列,對(duì)于定時(shí)任務(wù)來說,任務(wù)提交后會(huì)位于隊(duì)列尾部,任務(wù)的執(zhí)行時(shí)間會(huì)靠后,所以對(duì)于定時(shí)任務(wù)來說,使用單獨(dú)的隊(duì)列。
啟動(dòng) beat:
python manage.py celery beat -l info
監(jiān)控工具 flower
如果celery中的任務(wù)執(zhí)行失敗了,有些場景是需要對(duì)這些任務(wù)進(jìn)行監(jiān)控, flower 是基于 Tornado 開發(fā)的web應(yīng)用。安裝用 pip install flower ;啟動(dòng)它可以是:
python manage.py celery flower # python manage.py celery flower --basic_auth=admin:admin
用瀏覽器訪問 http://localhost:5555 即可查看:

以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python?clip與range函數(shù)保姆級(jí)使用教程
本文主要和大家介紹了詳解Python中clip與range函數(shù)的用法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參,希望能幫助到大家2022-06-06
TensorFlow安裝及jupyter notebook配置方法
下面小編就為大家?guī)硪黄猅ensorFlow安裝及jupyter notebook配置方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-09-09
如何用python獲取EXCEL文件內(nèi)容并保存到DBC
很多時(shí)候,使用python進(jìn)行數(shù)據(jù)分析的第一步就是讀取excel文件,下面這篇文章主要給大家介紹了關(guān)于如何用python獲取EXCEL文件內(nèi)容并保存到DBC的相關(guān)資料,需要的朋友可以參考2023-12-12
Python數(shù)據(jù)類型中的“冒號(hào)“[::]——分片與步長操作示例
這篇文章主要介紹了Python數(shù)據(jù)類型中的“冒號(hào)“[::]——分片與步長操作,結(jié)合實(shí)例形式分析了Python基本數(shù)據(jù)類型中的分片與步長使用方法及相關(guān)操作技巧,需要的朋友可以參考下2018-01-01
Django異步任務(wù)線程池實(shí)現(xiàn)原理
這篇文章主要介紹了Django異步任務(wù)線程池實(shí)現(xiàn)原理,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12

