python中celery的基本使用詳情
1.基本介紹
Celery 是由Python 編寫的簡單,靈活,可靠的用來處理大量信息的分布式系統(tǒng),它同時(shí)提供操作和維護(hù)分布式系統(tǒng)所需的工具。Celery 專注于實(shí)時(shí)任務(wù)處理,支持任務(wù)調(diào)度。
簡單的說,它就是一個(gè)分布式隊(duì)列的管理工具,用celery提供的接口快速實(shí)現(xiàn)并管理一個(gè)分布式的任務(wù)隊(duì)列。
有一點(diǎn)我們需要搞清楚,Celery 本身并不是任務(wù)隊(duì)列,它是一個(gè)分布式隊(duì)列的管理工具,Celery封裝好了操作常見任務(wù)隊(duì)列的各種操作,比如說從監(jiān)聽某個(gè)任務(wù)隊(duì)列并從該隊(duì)列中拿到數(shù)據(jù)進(jìn)行消費(fèi)。
2.使用場景
它可以讓任務(wù)的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機(jī)上運(yùn)行。我們通常使用它來實(shí)現(xiàn)異步任務(wù)(async task)和定時(shí)任務(wù)(crontab)。
- 異步任務(wù): 將耗時(shí)操作任務(wù)提交給
Celery去異步執(zhí)行,比如發(fā)送短信/郵件、消息推送、音視頻處理等等 - 定時(shí)任務(wù): 定時(shí)執(zhí)行某件事情,比如每天數(shù)據(jù)統(tǒng)計(jì)
3.工作流程和組成部分
這里用一張圖片說明下:

Celery的架構(gòu)由三部分組成,消息中間件(message broker),任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(task result store)組成。
消息中間件:
Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成。包括RabbitMQ, Redis等等,官方推薦用rabbitMQ,因?yàn)樗志梅€(wěn)定。
任務(wù)執(zhí)行單元:
Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中。
任務(wù)結(jié)果存儲:
Task result store用來存儲Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲任務(wù)的結(jié)果,包括AMQP, redis等
另外, Celery還支持不同的并發(fā)和序列化的手段。
并發(fā):Prefork, Eventlet, gevent, threads/single threaded
序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等 先安裝模塊
pip install celery pip install redis
4.Celery執(zhí)行異步任務(wù)
4.1 基礎(chǔ)使用
這里項(xiàng)目結(jié)構(gòu)如下:

第一步:先創(chuàng)建celery相關(guān)配置配置celery_object.py
import celery
# 執(zhí)行如下命令: celery -A celery_object worker -l info
backend = "redis://127.0.0.1:6379/4" # 設(shè)置redis的4號數(shù)據(jù)庫來存放結(jié)果
broker = "redis://127.0.0.1:6379/5" # 設(shè)置redis的5號數(shù)據(jù)庫存放消息中間件
celery_app = celery.Celery(
"celery_demo",
backend=backend,
broker=broker,
include=[
"celery_task",
],
)
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]
celery_app.conf.timezone = "Asia/Shanghai" # 時(shí)區(qū)
celery_app.conf.enable_utc = False # 是否使用UTC參數(shù)說明:
- backend 就是異步任務(wù)執(zhí)行完成以后,結(jié)果存放的地方。
- broker 就是具體執(zhí)行任務(wù)的工作節(jié)點(diǎn)。
- celery.Celery()方法是實(shí)例化一個(gè)celery對象。
第二步:創(chuàng)建任務(wù)相關(guān)的文件celery_task.py
import time
from celery_object import celery_app
@celery_app.task
def send_email(name):
print("向%s發(fā)送郵件..." % name)
time.sleep(5)
print("向%s發(fā)送郵件完成" % name)
return f"成功拿到{name}發(fā)送的郵件!"
@celery_app.task
def send_msg(name):
print("向%s發(fā)送短信..." % name)
time.sleep(5)
print("向%s發(fā)送短信完成" % name)
return f"成功拿到{name}發(fā)送的短信!"通過@celery_app.task這樣的裝飾器,成功的把對應(yīng)的函數(shù)變成對應(yīng)celery的異步worker函數(shù)。
緊接著我們在項(xiàng)目當(dāng)前所在的目錄執(zhí)行命令:
celery -A celery_object worker -l info
- -A 指的是application應(yīng)用對象
- worker 就是工作人(固定寫法)
- -l 指的是日志級別,這里是打印
info級別的日志
之后就可以有下面的輸出顯示就代表celery啟動(dòng)成功:

之后我們就可以向celery生產(chǎn)任務(wù)了,創(chuàng)建produce_result.py文件。
from celery_task import send_email, send_msg
if __name__ == "__main__":
for i in range(10):
result = send_email.delay(f"張三{i}")
print(result.id)
result2 = send_msg.delay(f"李四{i}")
print(result2.id)運(yùn)行生產(chǎn)任務(wù)的程序,會看到如下的數(shù)據(jù),這里打印的就是任務(wù)ID。

然后在終端可以看到下面的東西,就代表celery成功的拿到隊(duì)列中任務(wù) 并進(jìn)行消費(fèi)了。

然后打開我們的redis可以看到有對應(yīng)的數(shù)據(jù)記錄。

與此同時(shí) 我們還可以查看celery任務(wù)ID的狀態(tài),check_result.py寫入如下:
from celery.result import AsyncResult
from celery_object import celery_app
async_result = AsyncResult(id="d1c722fa-4ebf-432e-967e-a462bdefeac4", app=celery_app)
print("任務(wù)狀態(tài):", async_result.status)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 將結(jié)果刪除
elif async_result.failed():
print("執(zhí)行失敗")
elif async_result.status == "PENDING":
print("任務(wù)等待中被執(zhí)行")
elif async_result.status == "RETRY":
print("任務(wù)異常后正在重試")
elif async_result.status == "STARTED":
print("任務(wù)已經(jīng)開始被執(zhí)行")運(yùn)行結(jié)果:
任務(wù)狀態(tài): SUCCESS
成功拿到李四0發(fā)送的短信!
到此這篇關(guān)于python中celery的基本使用詳情的文章就介紹到這了,更多相關(guān)python celery內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python網(wǎng)絡(luò)爬蟲精解之pyquery的使用說明
PyQuery是一個(gè)類似于jQuery的解析網(wǎng)頁工具,使用lxml操作xml和html文檔,它的語法和jQuery很像。和XPATH,Beautiful Soup比起來,PyQuery更加靈活,提供增加節(jié)點(diǎn)的class信息,移除某個(gè)節(jié)點(diǎn),提取文本信息等功能2021-09-09
Python實(shí)現(xiàn)合并兩個(gè)字典的8種方法
Python有多種方法可以通過使用各種函數(shù)和構(gòu)造函數(shù)來合并字典,本文主要介紹了Python實(shí)現(xiàn)合并兩個(gè)字典的8種方法,具有一定的參考價(jià)值,感興趣的可以了解一下2024-07-07
解決Ubuntu pip 安裝 mysql-python包出錯(cuò)的問題
今天小編就為大家分享一篇解決Ubuntu pip 安裝 mysql-python包出錯(cuò)的問題,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-06-06
詳解Django中的ifequal和ifnotequal標(biāo)簽使用
這篇文章主要介紹了詳解Django中的ifequal和ifnotequal標(biāo)簽使用,Django是重多高人氣Python框架中最為著名的一個(gè),需要的朋友可以參考下2015-07-07
Python list運(yùn)算操作代碼實(shí)例解析
這篇文章主要介紹了Python list運(yùn)算操作代碼實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-01-01
python+Tesseract OCR實(shí)現(xiàn)截屏識別文字
pytesseract Python常用pytesseract進(jìn)行圖片上的文字識別,本文主要介紹了python+Tesseract?OCR實(shí)現(xiàn)截屏識別文字,具有一定的參考價(jià)值,感興趣的可以了解一下2023-11-11
pycharm安裝django框架詳細(xì)圖文教程(指定版本)
這篇文章主要給大家介紹了關(guān)于pycharm安裝django框架(指定版本)的相關(guān)資料,PyCharm是一種Python?IDE,帶有一整套可以幫助用戶在使用Python語言開發(fā)時(shí)提高其效率的工具,需要的朋友可以參考下2023-10-10
Python標(biāo)準(zhǔn)庫之itertools庫的使用方法
Python提供了一個(gè)非常棒的模塊用于創(chuàng)建自定義的迭代器,這個(gè)模塊就是 itertools。itertools 提供的工具相當(dāng)高效且節(jié)省內(nèi)存,下面這篇文章主要給大家介紹了關(guān)于Python標(biāo)準(zhǔn)庫之itertools庫使用的相關(guān)資料,需要的朋友可以參考下。2017-09-09
Python使用itertools模塊實(shí)現(xiàn)排列組合功能示例
這篇文章主要介紹了Python使用itertools模塊實(shí)現(xiàn)排列組合功能,涉及Python基于itertools模塊product、permutations與combinations_with_replacement方法進(jìn)行排列、組合等相關(guān)操作實(shí)現(xiàn)技巧,需要的朋友可以參考下2018-07-07

