Python+Pika+RabbitMQ環(huán)境部署及實(shí)現(xiàn)工作隊(duì)列的實(shí)例教程
rabbitmq中文翻譯的話,主要還是mq字母上:Message Queue,即消息隊(duì)列的意思。前面還有個rabbit單詞,就是兔子的意思,和python語言叫python一樣,老外還是蠻幽默的。rabbitmq服務(wù)類似于mysql、apache服務(wù),只是提供的功能不一樣。rabbimq是用來提供發(fā)送消息的服務(wù),可以用在不同的應(yīng)用程序之間進(jìn)行通信。
安裝rabbitmq
先來安裝下rabbitmq,在ubuntu 12.04下可以直接通過apt-get安裝:
sudo apt-get install rabbitmq-server
安裝好后,rabbitmq服務(wù)就已經(jīng)啟動好了。接下來看下python編寫Hello World!的實(shí)例。實(shí)例的內(nèi)容就是從send.py發(fā)送“Hello World!”到rabbitmq,receive.py從rabbitmq接收send.py發(fā)送的信息。

其中P表示produce,生產(chǎn)者的意思,也可以稱為發(fā)送者,實(shí)例中表現(xiàn)為send.py;C表示consumer,消費(fèi)者的意思,也可以稱為接收者,實(shí)例中表現(xiàn)為receive.py;中間紅色的表示隊(duì)列的意思,實(shí)例中表現(xiàn)為hello隊(duì)列。
python使用rabbitmq服務(wù),可以使用現(xiàn)成的類庫pika、txAMQP或者py-amqplib,這里選擇了pika。
安裝pika
安裝pika可以使用pip來進(jìn)行安裝,pip是python的軟件管理包,如果沒有安裝,可以通過apt-get安裝
sudo apt-get install python-pip
通過pip安裝pika:
sudo pip install pika
send.py代碼
連接到rabbitmq服務(wù)器,因?yàn)槭窃诒镜販y試,所以就用localhost就可以了。
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
聲明消息隊(duì)列,消息將在這個隊(duì)列中進(jìn)行傳遞。如果將消息發(fā)送到不存在的隊(duì)列,rabbitmq將會自動清除這些消息。
channel.queue_declare(queue='hello')
發(fā)送消息到上面聲明的hello隊(duì)列,其中exchange表示交換器,能精確指定消息應(yīng)該發(fā)送到哪個隊(duì)列,routing_key設(shè)置為隊(duì)列的名稱,body就是發(fā)送的內(nèi)容,具體發(fā)送細(xì)節(jié)暫時先不關(guān)注。
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
關(guān)閉連接
connection.close()
完整代碼
#!/usr/bin/env python
#coding=utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
先來執(zhí)行下這個程序,執(zhí)行成功的話,rabbitmqctl應(yīng)該成功增加了hello隊(duì)列,并且隊(duì)列里應(yīng)該有一條信息,用rabbitmqctl命令來查看下
rabbitmqctl list_queues
在筆者的電腦上輸出如下信息:

確實(shí)有一個hello隊(duì)列,并且隊(duì)列里有一條信息。接下來用receive.py來獲取隊(duì)列里的信息。
receive.py代碼
和send.py的前面兩個步驟一樣,都是要先連接服務(wù)器,然后聲明消息的隊(duì)列,這里就不再貼同樣代碼了。
接收消息更為復(fù)雜一些,需要定義一個回調(diào)函數(shù)來處理,這邊的回調(diào)函數(shù)就是將信息打印出來。
def callback(ch, method, properties, body): print "Received %r" % (body,)
告訴rabbitmq使用callback來接收信息
channel.basic_consume(callback, queue='hello', no_ack=True)
開始接收信息,并進(jìn)入阻塞狀態(tài),隊(duì)列里有信息才會調(diào)用callback進(jìn)行處理。按ctrl+c退出。
channel.start_consuming()
完整代碼
#!/usr/bin/env python
#coding=utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback, queue='hello', no_ack=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
執(zhí)行程序,就能夠接收到隊(duì)列hello里的消息Hello World!,然后打印在屏幕上。換一個終端,再次執(zhí)行send.py,可以看到receive.py這邊會再次接收到信息。
工作隊(duì)列示例
1.準(zhǔn)備工作(Preparation)
在實(shí)例程序中,用new_task.py來模擬任務(wù)分配者, worker.py來模擬工作者。
修改send.py,從命令行參數(shù)里接收信息,并發(fā)送
import sys
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print " [x] Sent %r" % (message,)
修改receive.py的回調(diào)函數(shù)。
import time
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
這邊先打開兩個終端,都運(yùn)行worker.py,處于監(jiān)聽狀態(tài),這邊就相當(dāng)于兩個工作者。打開第三個終端,運(yùn)行new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
另外一個工作者接收到2個任務(wù) :
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
從上面來看,每個工作者,都會依次分配到任務(wù)。那么如果一個工作者,在處理任務(wù)的時候掛掉,這個任務(wù)就沒有完成,應(yīng)當(dāng)交由其他工作者處理。所以應(yīng)當(dāng)有一種機(jī)制,當(dāng)一個工作者完成任務(wù)時,會反饋消息。
2.消息確認(rèn)(Message acknowledgment)
消息確認(rèn)就是當(dāng)工作者完成任務(wù)后,會反饋給rabbitmq。修改worker.py中的回調(diào)函數(shù):
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag)
這邊停頓5秒,可以方便ctrl+c退出。
去除no_ack=True參數(shù)或者設(shè)置為False也可以。
channel.basic_consume(callback, queue='hello', no_ack=False)
用這個代碼運(yùn)行,即使其中一個工作者ctrl+c退出后,正在執(zhí)行的任務(wù)也不會丟失,rabbitmq會將任務(wù)重新分配給其他工作者。
3.消息持久化存儲(Message durability)
雖然有了消息反饋機(jī)制,但是如果rabbitmq自身掛掉的話,那么任務(wù)還是會丟失。所以需要將任務(wù)持久化存儲起來。聲明持久化存儲:
channel.queue_declare(queue='hello', durable=True)
但是這個程序會執(zhí)行錯誤,因?yàn)閔ello這個隊(duì)列已經(jīng)存在,并且是非持久化的,rabbitmq不允許使用不同的參數(shù)來重新定義存在的隊(duì)列。重新定義一個隊(duì)列:
channel.queue_declare(queue='task_queue', durable=True)
在發(fā)送任務(wù)的時候,用delivery_mode=2來標(biāo)記任務(wù)為持久化存儲:
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
4.公平調(diào)度(Fair dispatch)
上面實(shí)例中,雖然每個工作者是依次分配到任務(wù),但是每個任務(wù)不一定一樣??赡苡械娜蝿?wù)比較重,執(zhí)行時間比較久;有的任務(wù)比較輕,執(zhí)行時間比較短。如果能公平調(diào)度就最好了,使用basic_qos設(shè)置prefetch_count=1,使得rabbitmq不會在同一時間給工作者分配多個任務(wù),即只有工作者完成任務(wù)之后,才會再次接收到任務(wù)。
channel.basic_qos(prefetch_count=1)
new_task.py完整代碼
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代碼
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
相關(guān)文章
安裝pytorch報錯torch.cuda.is_available()=false問題的解決過程
最近想用pytorch,因此裝了pytorch,但是碰到了問題,下面這篇文章主要給大家介紹了關(guān)于安裝pytorch報錯torch.cuda.is_available()=false問題的解決過程,需要的朋友可以參考下2022-05-05
conda管理Python虛擬環(huán)境的實(shí)現(xiàn)
本文主要介紹了conda管理Python虛擬環(huán)境的實(shí)現(xiàn),主要包括使用conda工具創(chuàng)建、查看和刪除Python虛擬環(huán)境,具有一定的參考價值,感興趣的可以了解一下2024-01-01
PyTorch詳解經(jīng)典網(wǎng)絡(luò)ResNet實(shí)現(xiàn)流程
ResNet全稱residual neural network,主要是解決過深的網(wǎng)絡(luò)帶來的梯度彌散,梯度爆炸,網(wǎng)絡(luò)退化(即網(wǎng)絡(luò)層數(shù)越深時,在數(shù)據(jù)集上表現(xiàn)的性能卻越差)的問題2022-05-05
Python bsddb模塊操作Berkeley DB數(shù)據(jù)庫介紹
這篇文章主要介紹了Python bsddb模塊操作Berkeley DB數(shù)據(jù)庫介紹,這里簡單介紹一些關(guān)于bsddb的使用方法,需要的朋友可以參考下2015-04-04
YOLOv5目標(biāo)檢測之a(chǎn)nchor設(shè)定
在訓(xùn)練yolo網(wǎng)絡(luò)檢測目標(biāo)時,需要根據(jù)待檢測目標(biāo)的位置大小分布情況對anchor進(jìn)行調(diào)整,使其檢測效果盡可能提高,下面這篇文章主要給大家介紹了關(guān)于YOLOv5目標(biāo)檢測之a(chǎn)nchor設(shè)定的相關(guān)資料,需要的朋友可以參考下2022-05-05
python中面向?qū)ο蟮淖⒁恻c(diǎn)概述總結(jié)
大家好,本篇文章主要講的是python中面向?qū)ο蟮淖⒁恻c(diǎn)概述總結(jié),感興趣的同學(xué)趕快來看一看吧,對你有幫助的話記得收藏一下2022-02-02
Python3 使用map()批量的轉(zhuǎn)換數(shù)據(jù)類型,如str轉(zhuǎn)float的實(shí)現(xiàn)
今天小編就為大家分享一篇Python3 使用map()批量的轉(zhuǎn)換數(shù)據(jù)類型,如str轉(zhuǎn)float的實(shí)現(xiàn),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-11-11
python實(shí)現(xiàn)兩個一維列表合并成一個二維列表
今天小編就為大家分享一篇python實(shí)現(xiàn)兩個一維列表合并成一個二維列表,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-12-12
python實(shí)現(xiàn)獲取aws route53域名信息的方法
最近由于工作原因接觸到aws的服務(wù),我需要實(shí)時獲取所有的域名信息,用于對其進(jìn)行掃描,因此寫了一個自動化爬取腳本 給需要的人分享,對python獲取aws route53域名信息相關(guān)知識感興趣的朋友一起看看吧2023-12-12
基于numpy.random.randn()與rand()的區(qū)別詳解
下面小編就為大家分享一篇基于numpy.random.randn()與rand()的區(qū)別詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-04-04

