詳解Django中異步任務(wù)之django-celery
Celery文檔參考:http://docs.jinkan.org/docs/celery/
參考文章:http://www.dhdzp.com/article/158046.htm
Django中異步任務(wù)---django-celery
Celery簡(jiǎn)單介紹:
celery使用場(chǎng)景:
- 耗時(shí)任務(wù)定時(shí)任務(wù)
- 請(qǐng)求結(jié)果不怎么重要的
- 耗時(shí)任務(wù)比如:發(fā)送短信驗(yàn)證碼我們可以先發(fā)送給客戶(hù)任務(wù)狀態(tài)(請(qǐng)求成功或失?。?/li>
- 請(qǐng)求結(jié)果重要的建議使用django實(shí)現(xiàn) 比如:支付
首先簡(jiǎn)單介紹一下,Celery 是一個(gè)強(qiáng)大的分布式任務(wù)隊(duì)列,它可以讓任務(wù)的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機(jī)上運(yùn)行。我們通常使用它來(lái)實(shí)現(xiàn)異步任務(wù)(asynctask)和定時(shí)任務(wù)(crontab)。它的架構(gòu)組成如下圖

Celery 主要包含以下幾個(gè)模塊:
任務(wù)模塊 Task
包含異步任務(wù)和定時(shí)任務(wù)。其中,異步任務(wù)通常在業(yè)務(wù)邏輯中被觸發(fā)并發(fā)往任務(wù)隊(duì)列,而定時(shí)任務(wù)由 Celery Beat 進(jìn)程周期性地將任務(wù)發(fā)往任務(wù)隊(duì)列。
消息中間件 Broker
Broker,即為任務(wù)調(diào)度隊(duì)列,接收任務(wù)生產(chǎn)者發(fā)來(lái)的消息(即任務(wù)),將任務(wù)存入隊(duì)列。Celery 本身不提供隊(duì)列服務(wù),官方推薦使用 RabbitMQ 和 Redis 等。
任務(wù)執(zhí)行單元 Worker
Worker 是執(zhí)行任務(wù)的處理單元,它實(shí)時(shí)監(jiān)控消息隊(duì)列,獲取隊(duì)列中調(diào)度的任務(wù),并執(zhí)行它。
任務(wù)結(jié)果存儲(chǔ) Backend
Backend 用于存儲(chǔ)任務(wù)的執(zhí)行結(jié)果,以供查詢(xún)。同消息中間件一樣,存儲(chǔ)也可使用 RabbitMQ, Redis 和 MongoDB 等。
p>django-celery
首先需要統(tǒng)一一下使用的環(huán)境,以為如果redis的版本過(guò)高會(huì)報(bào)錯(cuò)

解決方法:建議降低redis版本
推薦版本
Django == 2.2.6
django-celery == 3.3.1
django-redis == 4.11.0
redis == 2.10.6
celery == 3.1.26.post2
依賴(lài)安裝:pip install .....
人都知道
1.修改setting.py django配置文件,增加如下:
import djcelery ###導(dǎo)包 djcelery.setup_loader() ### BROKER_URL = 'redis://127.0.0.1:6379/2' # BROKER_URL='redis://192.168.217.77:16379/2' #任何可用的redis都可以,不一定要在django server運(yùn)行的主機(jī)上 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' ### INSTALLED_APPS = [ ... "djcelery", # 加入djcelery應(yīng)用 ... ] CELERY_TIMEZONE='Asia/Shanghai' #并沒(méi)有北京時(shí)區(qū),與下面TIME_ZONE應(yīng)該一致 TIME_ZONE='Asia/Shanghai' #
開(kāi)頭增加如上配置文件,根據(jù)實(shí)際情況配置redis的地址和端口,時(shí)區(qū)一定要設(shè)置為Asia/Shanghai。否則時(shí)間不準(zhǔn)確回影響定時(shí)任務(wù)的運(yùn)行。
上面代碼首先導(dǎo)出djcelery模塊,并調(diào)用setup_loader方法加載有關(guān)配置;注意配置時(shí)區(qū),不然默認(rèn)使用UTC時(shí)間會(huì)比東八區(qū)慢8個(gè)小時(shí)。其中INSTALLED_APPS末尾添加兩項(xiàng),分別表示添加celery服務(wù)和自己定義的apps服務(wù)。
2.創(chuàng)建Celery所需的數(shù)據(jù)表
python manage.py migrate #如若不成功可以嘗試一下命令語(yǔ)句 #python manage.py syncdb
3.創(chuàng)建task

stasks.py
# -*- coding: utf-8 -*-
import json, time
from syl.settings import ALY_ACCESSKEY_ID, ALY_ACCESSKEY_SECRET
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
from celery import task
# 阿里云短信驗(yàn)證碼
@task
def Celery_Send_Sms(phone, data):
client = AcsClient(ALY_ACCESSKEY_ID, ALY_ACCESSKEY_SECRET, 'cn-hangzhou')
request = CommonRequest()
request.set_accept_format('json')
request.set_domain('dysmsapi.aliyuncs.com')
request.set_method('POST')
request.set_protocol_type('https') # https | http
request.set_version('2017-05-25')
request.set_action_name('SendSms')
request.add_query_param('RegionId', "cn-hangzhou")
request.add_query_param('PhoneNumbers', phone)
request.add_query_param('SignName', "美多商城")
request.add_query_param('TemplateCode', "SMS_205397849")
request.add_query_param('TemplateParam', data)
response = client.do_action(request)
time.sleep(10)
print(str(response, encoding='utf-8'))
res = json.loads(str(response, encoding='utf-8'))
#celery運(yùn)行命令
# python manage.py celery worker --loglevel=info
- settings.py中的djcelery.setup_loader()運(yùn)行時(shí), Celery便會(huì)查看所有INSTALLED_APPS中app目錄中的tasks.py文件, 找到標(biāo)記為task的function, 并將它們注冊(cè)為celery task.
- 在執(zhí)行djcelery.setup_loader()時(shí), task是以INSTALLED_APPS中的app名, 加.tasks.function_name注冊(cè)的
- 一次需要注意 在impprt task時(shí), 需要保持一致
- 如果我們由于python path不同而使用不同的引用方式時(shí)(例如在tasks.py中使用from myproject.myapp.tasks import add形式), Celery將無(wú)法得知這是同一task, 因此可能會(huì)引起奇怪的bug。
讓任務(wù)變成異步
例如我們希望在用戶(hù)發(fā)出request后異步執(zhí)行該task, 馬上返回response, 從而不阻塞該request, 使用戶(hù)有一個(gè)流暢的訪(fǎng)問(wèn)過(guò)程. 那么, 我們可以使用.delay。

views.py
import re
import random
from rest_framework.permissions import AllowAny
from django_redis import get_redis_connection
from rest_framework.views import APIView
from rest_framework.response import Response
# from utils.MyBaseView import send_message, Send_Sms
from verifications.stasks import Celery_Send_Sms
# 用戶(hù)注冊(cè)短信驗(yàn)證碼
class SmsCodeView(APIView):
'''使用apiview的限流'''
# 1. 所有人可以訪(fǎng)問(wèn)
permission_classes = (AllowAny,)
def post(self, request):
# 1. 獲取參數(shù)
phone = request.data.get('phone') # 手機(jī)號(hào)
image_code = request.data.get('image_code') # 字符串驗(yàn)證碼
image_code_uuid = request.data.get('image_code_uuid') # 前端生成的uuid,是redis中圖片驗(yàn)證碼的key
# 2. 檢查參數(shù)
if not all([phone, image_code, image_code_uuid]):
return Response({'code': 400, 'msg': '參數(shù)不全'})
# 檢查手機(jī)號(hào)是否正確
if not re.match(r'^1[3456789]\d{9}$', phone):
return Response({"code": 999, "msg": "手機(jī)號(hào)碼不正確"})
# 3. 檢查是否發(fā)送
redis_client = get_redis_connection('img_code') # 連接redis數(shù)據(jù)庫(kù)
# phone_exists = redis_client.get(phone)
# if phone_exists:
# return Response({"code": 999, "msg": "頻繁發(fā)送, 請(qǐng)稍后再試"})
# 4.檢查圖片驗(yàn)證碼是否合法
redis_image_code = redis_client.get(image_code_uuid) # 字符串驗(yàn)證碼
if redis_image_code:
# bytes 轉(zhuǎn)成 string
redis_image_code = redis_image_code.decode() # 把uuid解碼
# 比較用戶(hù)提供的圖片內(nèi)容是否和redis中保存的一致
if image_code.upper() != redis_image_code:
return Response({'code': 999, 'msg': '圖片驗(yàn)證碼不正確'})
# 5. 發(fā)送
code = '%06d' % random.randint(100000, 999999) # 隨機(jī)6位驗(yàn)證碼
print('code===============================================', code)
# 使用容聯(lián)云短信驗(yàn)證碼
# send_resp = send_message(phone, (code, "5"))
# 使用阿里云短信驗(yàn)證碼
data = {'code': code}
# send_resp = Send_Sms(phone, data)
# 使用celery異步發(fā)送短信
Celery_Send_Sms.delay(phone, data) #delay是注冊(cè)為celery異步任務(wù)的關(guān)鍵點(diǎn)
# Celery_Send_Sms(phone, data) # delay是注冊(cè)為celery異步任務(wù)的關(guān)鍵點(diǎn)
# 5.1 保存code 到 redis中
redis_client.setex(phone, 60 * 5, code) # phone:code, 5分鐘有效期
# 5.2 從redis中刪除這個(gè)圖片驗(yàn)證碼, 以防再次被使用
redis_client.delete(image_code_uuid)
# 6.存儲(chǔ)這個(gè)已經(jīng)發(fā)送驗(yàn)證碼的手機(jī)號(hào),防止頻繁發(fā)送(使用pipeline 批量操作 )
pl = redis_client.pipeline()
pl.setex(phone, 60 * 5, code)
pl.delete(image_code_uuid)
pl.execute()
# 7. 返回結(jié)果
return Response({"code": 200, "msg": "短信發(fā)送成功"})
1.啟動(dòng)celery
首先正常啟動(dòng)你的django任務(wù),然后啟動(dòng)celery服務(wù)即可。
python manage.py celery worker --loglevel=info

出現(xiàn)上圖這個(gè)報(bào)錯(cuò)不讓超級(jí)管理員來(lái)啟動(dòng),在settings.py加入以下配置
from celery import Celery, platforms platforms.C_FORCE_ROOT = True
2.驗(yàn)證celery任務(wù)
在搞定上面的東西以后,你就可以通過(guò)postman來(lái)請(qǐng)求接口讓接口使用celery來(lái)異步執(zhí)行任務(wù)而不阻塞你的request請(qǐng)求。

到此這篇關(guān)于詳解Django中異步任務(wù)之django-celery的文章就介紹到這了,更多相關(guān)Django異步任務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Pycharm2020.1安裝中文語(yǔ)言插件的詳細(xì)教程(不需要漢化)
這篇文章主要介紹了Pycharm2020.1安裝中文語(yǔ)言插件的詳細(xì)教程,不需要漢化,本文給大家分享三種方法,在這小編推薦使用方法二,具體內(nèi)容詳情大家跟隨小編一起看看吧2020-08-08
python自動(dòng)化測(cè)試之如何解析excel文件
這篇文章主要介紹了python自動(dòng)化測(cè)試之如何解析excel文件,今天我們就把不同模塊處理excel文件的方法做個(gè)總結(jié),直接做封裝,方便我們以后直接使用,增加工作效率。,需要的朋友可以參考下2019-06-06
python TinyDB輕量級(jí)文檔導(dǎo)向數(shù)據(jù)庫(kù)輕松存儲(chǔ)訪(fǎng)問(wèn)
這篇文章主要為大家介紹了python TinyDB輕量級(jí)文檔導(dǎo)向數(shù)據(jù)庫(kù)輕松存儲(chǔ)訪(fǎng)問(wèn)數(shù)據(jù)使用探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
Python3.0 實(shí)現(xiàn)決策樹(shù)算法的流程
這篇文章主要介紹了Python3.0 實(shí)現(xiàn)決策樹(shù)算法的流程,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-08-08
Pandas Shift函數(shù)的基礎(chǔ)入門(mén)學(xué)習(xí)筆記
shift函數(shù)是對(duì)數(shù)據(jù)進(jìn)行移動(dòng)的操作,下面這篇文章主要給大家介紹了關(guān)于Pandas Shift函數(shù)的基礎(chǔ)入門(mén)學(xué)習(xí)筆記,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-11-11

