django+celery+RabbitMQ自定義多個(gè)消息隊(duì)列的實(shí)現(xiàn)
關(guān)于django celery的使用網(wǎng)上有很多文章,本文就不多做更多的說明。
本文使用版本
- python==3.8.15
- Django==3.2.4
- celery==5.2.7
celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from kombu import Exchange, Queue
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'zkcelery.settings')
app = Celery('zkcelery')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
# 看了一篇文章說,如果使用redis做broker,exchange可以不配置;但如果使用rabbitMQ做broker,就必須要配置。
queue = (
Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'),
Queue('q1', exchange=Exchange('e1', type='direct'), routing_key='r1'),
Queue('q2', exchange=Exchange('e2', type='direct'), routing_key='r2'),
Queue('q3', exchange=Exchange('e3', type='fanout'), routing_key='r3'),
)
# 一旦配置了route后,所有的任務(wù)名都必須要指定route,否則任務(wù)無法執(zhí)行。
# 經(jīng)過測(cè)試,route匹配是最長(zhǎng)匹配規(guī)則。
route = {
'apps.zhiding.tasks.add': {'queue': 'q1', 'routing_key': 'r1'},
'apps.zhiding.tasks.multiply': {'queue': 'q2', 'routing_key': 'r2'},
# 其它的任務(wù)名稱,匹配這條路由
# 如果以上隊(duì)列的worker服務(wù)器壞了,這些任務(wù)會(huì)被全部放進(jìn)這個(gè)隊(duì)列里,該隊(duì)列的worker將繼續(xù)處理這些任務(wù)
# 下面這條隊(duì)列一定要配置,否則其它任務(wù)無法處理。
'*': {'queue': 'default', 'routing_key': 'default'},
}
app.conf.update(CELERY_QUEUES=queue, CELERY_ROUTES=route)
tasks.py
from celery import shared_task
import time
@shared_task
def add(x, y):
time.sleep(2)
print('任務(wù)睡眠2秒后執(zhí)行了')
return x + y
@shared_task
def multiply(x, y):
time.sleep(5)
print('任務(wù)睡眠5秒后執(zhí)行了')
return x * y
@shared_task
def sub(x, y):
time.sleep(4)
print('任務(wù)睡眠4秒后執(zhí)行了')
return x - y
筆者也看了很多博文,在settings.py配置文件中寫入CELERY_QUEUES和CELERY_ROUTES,上面的配置對(duì)應(yīng)下來就是如下代碼塊:
CELERY_QUEUES = (
Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'),
Queue('sq1', exchange=Exchange('sq1', type='direct'), routing_key='sq1'),
Queue('sq2', exchange=Exchange('sq2', type='direct'), routing_key='sq2'),
Queue('sq3', exchange=Exchange('sq3', type='fanout'), routing_key='sq3'),
)
CELERY_ROUTES = {
'apps.zhiding.tasks.add': {'queue': 'sq1', 'routing_key': 'sq1'},
'apps.zhiding.tasks.multiply': {'queue': 'sq2', 'routing_key': 'sq2'},
'*': {'queue': 'default', 'routing_key': 'default'},
}
但是筆者在實(shí)際使用中發(fā)現(xiàn)后面這種方式配置始終未生效,不知道是不是筆者版本的不同,沒有做更多的研究,如果你能找到問題的原因,歡迎評(píng)論交流。
啟動(dòng)worker
# 筆者使用的windows,啟動(dòng)時(shí)需要加上-P eventlet celery -A zkcelery worker -l info -P eventlet
啟動(dòng)后隊(duì)列中出現(xiàn)配置中的個(gè)隊(duì)列

同時(shí)會(huì)在rabbitmq中創(chuàng)建(如果不存在)4個(gè)隊(duì)列,交換機(jī)和相應(yīng)的綁定關(guān)系(當(dāng)然也可以直接通過rabbitmq管理端直接創(chuàng)建自己需要的隊(duì)列、交換機(jī)和綁定,具體根據(jù)個(gè)人習(xí)慣或者視工作場(chǎng)景而定選擇)

以隊(duì)列q1示例:

暫時(shí)先關(guān)閉worker,便于觀察消息隊(duì)列中的消息。
向隊(duì)列中發(fā)送幾條消息,消息均進(jìn)入到配置中指定的queue中

再次啟動(dòng)worker,隊(duì)列中的消息立馬被消費(fèi)

如何做到消費(fèi)指定的隊(duì)列中的消息,只需要啟動(dòng)的時(shí)候加上參數(shù)Q
# -Q指定消費(fèi)的隊(duì)列 # -n 指定worker節(jié)點(diǎn)的名稱,避免啟動(dòng)多個(gè)時(shí)的重名沖突 celery -A zkcelery worker -l info -Q q1 -n node1 -P eventlet
可以看到終端中queues只有q1了

q1中的消息被消費(fèi)掉了,其他隊(duì)列沒有變化

也可以同時(shí)指定多個(gè)消費(fèi)隊(duì)列
celery -A zkcelery worker -l info -Q q2,default -n node2 -P eventlet

當(dāng)然也可以在生產(chǎn)方指定推送的隊(duì)列,舉例如下:

到此這篇關(guān)于django+celery+RabbitMQ自定義多個(gè)消息隊(duì)列的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)django celery RabbitMQ消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python3導(dǎo)入CSV文件的實(shí)例(跟Python2有些許的不同)
今天小編就為大家分享一篇Python3導(dǎo)入CSV文件的實(shí)例(跟Python2有些許的不同),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-06-06
anaconda中Conda創(chuàng)建虛擬環(huán)境的實(shí)現(xiàn)步驟
在Anaconda中,可以使用conda命令來創(chuàng)建和管理虛擬環(huán)境,本文主要介紹了anaconda中Conda創(chuàng)建虛擬環(huán)境的實(shí)現(xiàn)步驟,具有一定的參考價(jià)值,感興趣的可以了解一下2023-12-12
python文件讀取時(shí)順序錯(cuò)誤的問題及解決
這篇文章主要介紹了python文件讀取時(shí)順序錯(cuò)誤的問題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08
python tkinter圖形界面代碼統(tǒng)計(jì)工具
這篇文章主要為大家詳細(xì)介紹了python tkinter圖形界面代碼統(tǒng)計(jì)工具,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-09-09
python3+telnetlib實(shí)現(xiàn)簡(jiǎn)單自動(dòng)測(cè)試示例詳解
telnetlib 模塊提供一個(gè)實(shí)現(xiàn)Telnet協(xié)議的類 Telnet,本文重點(diǎn)給大家介紹python3+telnetlib實(shí)現(xiàn)簡(jiǎn)單自動(dòng)測(cè)試示例詳解,需要的朋友可以參考下2021-08-08
基于Python繪制美觀動(dòng)態(tài)圓環(huán)圖、餅圖
這篇文章主要介紹了基于Python制作美觀動(dòng)態(tài)圓環(huán)圖、餅圖,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06

