Python分布式異步任務(wù)框架Celery使用教程
一、Celery架構(gòu)介紹
Celery:芹菜?(跟翻譯沒有任何關(guān)系),分布式異步任務(wù)框架(跟其他web框架無關(guān))
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.(不支持windows)
celery服務(wù)為其他項(xiàng)目服務(wù)提供異步解決任務(wù)需求的。
架構(gòu):
分為三部分
- broker:任務(wù)中間件,用戶提交的任務(wù),存在這個(gè)里面(redis,rabbitmq)
- worker:任務(wù)執(zhí)行者,消費(fèi)者,真正執(zhí)行任務(wù)的進(jìn)程(真正干活的人)
- backend:任務(wù)結(jié)果存儲(chǔ),任務(wù)執(zhí)行后的結(jié)果(redis,rabbitmq)


celery能夠做的事:
- 異步任務(wù)(區(qū)分同步任務(wù))
- 延遲任務(wù)
- 定時(shí)任務(wù)(其他框架做)
怎么更好的理解celery?
會(huì)有兩個(gè)服務(wù)同時(shí)運(yùn)行,一個(gè)是項(xiàng)目服務(wù)(django服務(wù)),一個(gè)是celery服務(wù),項(xiàng)目服務(wù)將需要異步處理的任務(wù)交給celery服務(wù),celery就會(huì)在需要時(shí)異步完成項(xiàng)目的需求。打個(gè)比方,人是一個(gè)獨(dú)立運(yùn)行的服務(wù)(django) | 醫(yī)院也是一個(gè)獨(dú)立運(yùn)行的服務(wù)(celery)。正常情況下,人可以完成所有健康情況的動(dòng)作,不需要醫(yī)院的參與;但當(dāng)人生病時(shí),就會(huì)被醫(yī)院接收,解決人生病問題,人生病的處理方案交給醫(yī)院來解決,所有人不生病時(shí),醫(yī)院獨(dú)立運(yùn)行,人生病時(shí),醫(yī)院就來解決人生病的需求。
注:python有自己的定時(shí)任務(wù),感興趣的了解下apscheduler。
二、Celery簡(jiǎn)單使用
安裝:pip install celery==5.1.2
使用:
1.配置celery
from celery import Celery
# app=Celery('test',)
# backend='redis://:密碼@127.0.0.1:6379/1' 如果有密碼,這么寫
broker = 'redis://127.0.0.1:6379/1' # redis地址
backend = 'redis://127.0.0.1:6379/2' # redis地址
# 1 實(shí)例化得到celery對(duì)象
app = Celery(__name__, backend=backend, broker=broker)
# 2 寫一堆任務(wù)(計(jì)算a+b,挖井,砍樹),函數(shù)
# 使用裝飾器包裹任務(wù)(函數(shù))
@app.task()
def add(a, b):
import time
time.sleep(2)
return a + b2.提交任務(wù)
# from celery_task import app import celery_task # 1 同步執(zhí)行 # res = celery_task.add(2, 3) # 普通的同步任務(wù),同步執(zhí)行任務(wù) # print(res)
2 異步任務(wù):
第一步:提交(使用任務(wù)名.apply_async(參數(shù)))
結(jié)果是任務(wù)id號(hào),唯一標(biāo)識(shí)這個(gè)任務(wù)
# res = celery_task.add.apply_async(args=[2, 3])
res = celery_task.add.apply_async(kwargs={'a':2,'b':3})
print(res) # abab1ad3-0e58-4faa-bc05-14d157dc8217第二步:讓worker執(zhí)行—>結(jié)果存到redis
通過命令啟動(dòng),非windows:
5.x之前這么啟動(dòng)
命令:celery worker -A celery_task -l info
5.x以后
命令:celery -A celery_task worker -l info
windows:
pip3 install eventlet
5.x之前這么啟動(dòng)
命令:celery worker -A celery_task -l info -P eventlet
5.x以后
命令:celery -A celery_task worker -l info -P eventlet
3.查看任務(wù)執(zhí)行結(jié)果
from celery_task import app
from celery.result import AsyncResult
id = 'abab1ad3-0e58-4faa-bc05-14d157dc8217'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful():
print('任務(wù)執(zhí)行成功了')
result = a.get() # 異步任務(wù)執(zhí)行的結(jié)果
print(result)
elif a.failed():
print('任務(wù)失敗')
elif a.status == 'PENDING':
print('任務(wù)等待中被執(zhí)行')
elif a.status == 'RETRY':
print('任務(wù)異常后正在重試')
elif a.status == 'STARTED':
print('任務(wù)已經(jīng)開始被執(zhí)行')三、Celery包結(jié)構(gòu)
目錄結(jié)構(gòu):
-celery_task # 包名
__init__.py
celery.py # app所在py文件
course_task.py # 任務(wù)
order_task.py # 任務(wù)
user_task.py # 任務(wù)
提交任務(wù).py # 提交任務(wù)
查看結(jié)果.py # 查看結(jié)果
創(chuàng)建多個(gè)任務(wù):
celery_task /celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# include 是一個(gè)列表,放被管理的task 的py文件
app = Celery(__name__, backend=backend, broker=broker,include=[
'celery_task.course_task',
'celery_task.order_task',
'celery_task.user_task',
])
# 原來,任務(wù)寫在這個(gè)py文件中
# 后期任務(wù)非常多,可能有用戶相關(guān)任務(wù),課程相關(guān)任務(wù),訂單相關(guān)任務(wù)。。。celery_task /任務(wù).py
user_task.py
import time
from .celery import app
# 發(fā)送短信任務(wù)
@app.task()
def send_sms(phone, code):
time.sleep(3) # 模擬發(fā)送短信延遲
print('短信發(fā)送成功,手機(jī)號(hào)是:%s,驗(yàn)證碼是:%s' % (phone, code))
return '短信發(fā)送成功'order_task.py
from .celery import app
# 生成訂單任務(wù)
@app.task()
def make_order():
with open(r'D:\py18\luffy_api\script\2 celery的包結(jié)構(gòu)\celery_task\order.txt', 'a', encoding='utf-8') as f:
f.write('生成一條訂單\n')
return Truecourse_task.py
from .celery import app
@app.task()
def add(a,b):
return a+b提交多個(gè)任務(wù):
from celery_task import user_task,order_task # 提交一個(gè)發(fā)送短信任務(wù) # res = user_task.send_sms.apply_async(args=['18972374345', '8888']) # print(res) # 提交一個(gè)生成訂單任務(wù) # res=order_task.make_order.apply_async() # print(res)
查看結(jié)果:
from celery_task.celery import app
from celery.result import AsyncResult
id = '0f283e22-e8d0-40a6-a8ed-8998038bc7a3'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
print(app.conf)
if a.successful():
print('任務(wù)執(zhí)行成功了')
result = a.get() # 異步任務(wù)執(zhí)行的結(jié)果
print(result)
elif a.failed():
print('任務(wù)失敗')
elif a.status == 'PENDING':
print('任務(wù)等待中被執(zhí)行')
elif a.status == 'RETRY':
print('任務(wù)異常后正在重試')
elif a.status == 'STARTED':
print('任務(wù)已經(jīng)開始被執(zhí)行')四、Celery延遲任務(wù)
# 添加延遲任務(wù)方式一:
# from datetime import datetime, timedelta
# datetime.utcnow() 獲取當(dāng)前的utc時(shí)間
# eta=datetime.utcnow() + timedelta(seconds=50) # 50s后的utc時(shí)間
# 10s后,發(fā)送短信
res=user_task.send_sms.apply_async(args=('12345566677', '8888'), eta=eta)
print(res)
# 使用第二種方式執(zhí)行異步任務(wù)(兩者傳參不同;不寫時(shí)間,就表示立即執(zhí)行):
res=user_task.send_sms.delay('12345566677', '8888')
print(res)五、Celery定時(shí)任務(wù)
第一步:celery.py中寫入
# 第一步,在包(celery_task)下的celey.py中寫入
###修改celery的配置信息 app.conf整個(gè)celery的配置信息
# 時(shí)區(qū)
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
####配置定時(shí)任務(wù)
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'send_sms_every_3_seconds': {
'task': 'celery_task.user_task.send_sms', # 指定執(zhí)行的是哪個(gè)任務(wù)
'schedule': timedelta(seconds=3),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點(diǎn)
'args': ('18953675221', '8888'),
},
'make_order_every_5_seconds': {
'task': 'celery_task.order_task.make_order', # 指定執(zhí)行的是哪個(gè)任務(wù)
'schedule': timedelta(seconds=5),
},
'add_every_1_seconds': {
'task': 'celery_task.course_task.add', # 指定執(zhí)行的是哪個(gè)任務(wù)
'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點(diǎn)
'args': (3, 5),
},
}第二步:?jiǎn)?dòng)worker
# celery worker -A 包名 -l info -P eventlet
celery worker -A celery_task -l info -P eventlet
如果beat沒有啟動(dòng),worker是沒有活干的,需要啟動(dòng)beat,worker才能干活,和beat啟動(dòng)順序無先后
第三步:?jiǎn)?dòng)beat
# celery beat -A celery_task -l
celery -A celery_task beat -l info
六、Django中集成Celery
第一種方式使用django-celery(了解):
第三方把django和celery集成起來,方便我們使用,但是,第三方寫的包的版本,跟celery和django版本完全對(duì)應(yīng)。
我們自己使用包結(jié)構(gòu)集成到django中:
第一步,把寫好的包,直接復(fù)制到項(xiàng)目根路徑
第二步,在視圖類中(函數(shù)中)
from celery_task.user_task import send_sms
def test(request):
mobile = request.GET.get('mobile')
code = '9999'
res = send_sms.delay(mobile, code) # 同步發(fā)送假設(shè)3分支鐘,異步發(fā)送,直接就返回id了,是否成功不知道,后期通過id查詢
print(res)
return HttpResponse(res)到此這篇關(guān)于Python分布式異步任務(wù)框架Celery使用教程的文章就介紹到這了,更多相關(guān)Python Celery內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python必備shelve與dbm本地持久化存儲(chǔ)數(shù)據(jù)的兩個(gè)強(qiáng)大工具
當(dāng)涉及存儲(chǔ)大量數(shù)據(jù)并且需要高效訪問時(shí),shelve和dbm模塊是Python中用于本地持久化存儲(chǔ)數(shù)據(jù)的兩個(gè)強(qiáng)大工具,它們?cè)试S開發(fā)人員以鍵值對(duì)的形式存儲(chǔ)數(shù)據(jù),并支持快速的檢索和更新操作,在本文將深入探討這兩個(gè)模塊,展示它們的優(yōu)勢(shì)和應(yīng)用場(chǎng)景2024-01-01
Tensorflow: 從checkpoint文件中讀取tensor方式
今天小編就為大家分享一篇Tensorflow: 從checkpoint文件中讀取tensor方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-02-02
關(guān)于tf.TFRecordReader()函數(shù)的用法解析
今天小編就為大家分享一篇關(guān)于tf.TFRecordReader()函數(shù)的用法解析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-02-02
淺談python中頻繁的print到底能浪費(fèi)多長(zhǎng)時(shí)間
今天小編就為大家分享一篇淺談python中頻繁的print到底能浪費(fèi)多長(zhǎng)時(shí)間,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-02-02
python logging.basicConfig不生效的原因及解決
今天小編就為大家分享一篇python logging.basicConfig不生效的原因及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-02-02
pycharm遠(yuǎn)程連接服務(wù)器運(yùn)行pytorch的過程詳解
這篇文章主要介紹了在Linux環(huán)境下使用Anaconda管理不同版本的Python環(huán)境,并通過PyCharm遠(yuǎn)程連接服務(wù)器來運(yùn)行PyTorch的過程,包括安裝PyTorch、CUDA以及配置PyCharm遠(yuǎn)程開發(fā)環(huán)境的詳細(xì)步驟,需要的朋友可以參考下2025-02-02

