rabbitmq(中間消息代理)在python中的使用詳解
在之前的有關(guān)線程,進程的博客中,我們介紹了它們各自在同一個程序中的通信方法。但是不同程序,甚至不同編程語言所寫的應(yīng)用軟件之間的通信,以前所介紹的線程、進程隊列便不再適用了;此種情況便只能使用socket編程了,然而不同程序之間的通信便不再像線程進程之間的那么簡單了,要考慮多種情況(比如其中一方斷線另一方如何處理;消息群發(fā),多個程序之間的通信等等),如果每遇到一次程序間的通信,便要根據(jù)不同情況編寫不同的socket,還要維護、完善這個socket這會使得編程人員的工作量大大增加,也使得程序更易崩潰。所以,一般遇到這種情況,便使用消息隊列MQ(Message Queue),那么問題來了。
1. 什么是消息隊列MQ?
MQ是一種應(yīng)用程序?qū)?yīng)用程序的通信方法。應(yīng)用程序通過讀出(寫入)隊列的消息(針對應(yīng)用程序的數(shù)據(jù))來通信,而無需使用專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發(fā)送數(shù)據(jù)進行通信,而不是通過直接調(diào)用彼此來通信,排隊指的是應(yīng)用程序通過 隊列來通信。隊列的使用排除了接收和發(fā)送應(yīng)用程序同時執(zhí)行的要求。
2. 什么是rabbitmq?如何使用它?
RabbitMQ是流行的開源消息隊列系統(tǒng),用erlang語言開發(fā)。RabbitMQ是AMQP(高級消息隊列協(xié)議)的標準實現(xiàn)。
RabbitMQ也是前面所提到的生產(chǎn)者消費者模型,一端發(fā)送消息(生產(chǎn)任務(wù)),一端接收消息(處理任務(wù))。
rabbitmq的詳細使用(包括各種系統(tǒng)的安裝配置)可參見其官方文檔:http://www.rabbitmq.com/documentation.html
由于應(yīng)用程序之間的通信情況異常復(fù)雜,rabbitmq支持的編程語言有10多種,所以在此博客中不可能完全演示rabbitmq的所有使用。本片博客將會介紹rabbitmq在python中的基本使用,如果你只想使用rabbitmq完成一些簡單的任務(wù),則本篇博客足以滿足你的需求;如果你想深入學(xué)習(xí)了解rabbitmq的工作原理,那么讀完本篇博客,你可以更容易的讀懂rabbitmq的官方文檔;當然這些只限于你在使用python編程。
在python中我們使用pika(第三方模塊,使用pip安裝即可使用)模塊進行rabbitmq的操作,接下來,使用python實現(xiàn)一個rabbitmq最簡單的通信。
In the diagram below, "P" is our producer and "C" is our consumer. The box in the middle is a queue - a message buffer that RabbitMQ keeps on behalf of the consumer.
Our overall design will look like:

Producer sends messages to the "hello" queue. The consumer receives messages from that queue.
例一(簡單的消息收發(fā)):
Sending

Our first programsend.pywill send a single message to the queue. The first thing we need to do is to establish a connection with RabbitMQ server.
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) # 建立程序與rabbitmq的連接
channel = connection.channel()
channel.queue_declare(queue='hello') # 定義hello隊列
channel.basic_publish(exchange='',
routing_key='hello', # 告訴rabbitmq將消息發(fā)送到hello隊列中
body='Hello world!') # 發(fā)送消息的內(nèi)容
print(" [x] Sent 'Hello World!'")
connection.close() # 關(guān)閉與rabbitmq的連接

Our second programreceive.pywill receive messages from the queue and print them on the screen.
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) # 建立程序與rabbitmq的連接
channel = connection.channel()
# 在接收端定義隊列,參數(shù)與發(fā)送端的相同
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
"""
收到消息調(diào)用callback處理消息
:param ch:
:param method:
:param properties:
:param body:
:return:
"""
print(" [x] received %r" % body)
# time.sleep(30)
print("Done....")
channel.basic_consume(callback,
queue='hello', # 告訴rabbitmq此程序從hello隊列中接收消息
no_ack=True)
# channel.basic_consume(callback,
# queue='hello')
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 開始接收,未收到消息阻塞
注1:我們可以打開time.sleep()的注釋(模仿任務(wù)處理所需的時間),將no_ack設(shè)為默認值(不傳參數(shù)),同時運行多個receive.py, 運行send.py發(fā)一次消息,第一個開始運行的receive.py接收到消息,開始處理任務(wù),如果中途宕機(任務(wù)未處理完);那么第二個開始運行的receive.py就會接收到消息,開始處理任務(wù);如果第二個也宕機了,則第三個繼續(xù);如果依次所有運行的receive都宕機(任務(wù)未處理完)了,則下次開始運行的第一個receive.py將繼續(xù)接收消息處理任務(wù),這個機制防止了一些必須完成的任務(wù)由于處理任務(wù)的程序異常終止導(dǎo)致任務(wù)不能完成。如果將no_ack設(shè)為True,中途宕機,則后面的接收端不會再接收消息處理任務(wù)。
注2:如果發(fā)送端不停的發(fā)消息,則接收端分別是第一個開始運行的接收,第二個開始運行的接收,第三個開始運行接收,依次接收,這是rabbitmq的消息輪循機制(相當于負載均衡,防止一個接收端接收過多任務(wù)卡死,當然這種機制存在弊端,就是如果就收端機器有的配置高有的配置低,就會使配置高的機器得不到充分利用而配置低的機器一直在工作)。這一點可以啟動多個receive.py,多次運行send.py驗證。
上面的例子我們介紹了消息的接收端(即任務(wù)的處理端)宕機,我們該如何處理。接下來,我們將重點放在消息的發(fā)送端(即服務(wù)端),與接收端不同,如果發(fā)送端宕機,則會丟失存儲消息的隊列,存儲的消息(要發(fā)送給接收端處理的任務(wù)),這些信息一旦丟失會造成巨大的損失,所以下面的重點就是消息的持久化,即發(fā)送端異常終止,重啟服務(wù)后,隊列,消息都將自動加載進服務(wù)里。其實只要將上面的代碼稍微修改就可實現(xiàn)。
例二(消息的持久化):
Sending:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True) #使隊列持久化
message = "Hello World"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, #使消息持久化
))
print(" [x] Sent %r" % message)
connection.close()
Receiving:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True) #再次申明隊列,和發(fā)送端參數(shù)應(yīng)一樣
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] received %r" % body)
time.sleep(2)
print(" [x] Done")
# 因為沒有設(shè)置no_ask=True, 所以需要告訴rabbitmq消息已經(jīng)處理完畢,rabbitmq將消息移出隊列。
ch.basic_ack(delivery_tag=method.delivery_tag)
#同一時間worker只接收一條消息,等這條消息處理完在接收下一條
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
注1:worker.py中的代碼如果不設(shè)置,則new_task.py意外終止在重啟后,worker會同時接收終止前沒有處理的所有消息。兩個程序中的queue設(shè)置的參數(shù)要相同,否則程序出錯。no_ask=True如果沒設(shè)置,則worker.py中的ch.basic_ack(delivery_tag=method.delivery_tag)這行代碼至關(guān)重要,如果不寫,則不管接收的消息有沒有處理完,此消息將一直存在與隊列中。
注2:這句代碼---channel.basic_qos(prefetch_count=1),解決了上例中消息輪循機制的代碼,即接收端(任務(wù)的處理端)每次只接收一個任務(wù)(參數(shù)為幾接收幾個任務(wù)),處理完成后通過向發(fā)送端的匯報(即注1中的代碼)來接收下一個任務(wù),如果有任務(wù)正在處理中它不再接收新的任務(wù)。
前面所介紹的例一,例二都是一條消息,只能被一個接收端收到。那么該如何實現(xiàn)一條消息多個接收端同時收到(即消息群發(fā)或著叫廣播模式)呢?
其實,在rabbitmq中只有consumer(消費者,即接收端)與queue綁定,對于producer(生產(chǎn)者,即發(fā)送端)只是將消息發(fā)送到特定的隊列。consumer從與自己相關(guān)的queue中讀取消息而已。所以要實現(xiàn)消息群發(fā),只需要將同一條放到多個消費者隊列即可。在rabbitmq中這個工作由exchange來做,它可以設(shè)定三種類型,它們分別實現(xiàn)了不同的需求,我們分別來介紹。
例三(exchange的類型為fanout):
當exchange的類型為fanout時,所有綁定這個exchange的隊列都會收到發(fā)來的消息。

import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 申明一個exchange,兩個參數(shù)分別為exchange的名字和類型;當exchang='fanout'時,所有綁定到此exchange的消費者隊列都將收到消息
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
# 消息可以在命令行啟動腳本時以參數(shù)的形式傳入
# message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = 'Hello World!'
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
# 隨機生成一個queue,此queue唯一,且在連接端開后自動銷毀
result = channel.queue_declare(exclusive=True)
# 得到隨機生成消費者隊列的名字
queue_name = result.method.queue
# 將消費者隊列與exchange綁定
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] received %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
注1:emit_log.py為消息的發(fā)送端,receive_logs.py為消息的接收端??梢酝瑫r運行多個receive_logs.py,當emit_log.py發(fā)送消息時,可以發(fā)現(xiàn)所有正在運行的receive_logs.py都會收到來自發(fā)送端的消息。
注2:類似與廣播,如果消息發(fā)送時,接收端沒有運行,那么它將不會收到此條消息,即消息的廣播是即時的。
例四(exchange的類型為direct):
當exchange的類型為direct時,發(fā)送端和接收端都要指明消息的級別,接收端只能接收到被指明級別的消息。

import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
# 命令行啟動時,以參數(shù)的的形式傳入發(fā)送消息的級別,未傳怎默認設(shè)置未info
# severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
# 命令行啟動時,以參數(shù)的的形式傳入發(fā)送消息的內(nèi)容,未傳怎默認設(shè)置Hello World!
# message = ' '.join(sys.argv[2:]) or 'Hello World!'
# 演示使用,實際運用應(yīng)用上面的方式設(shè)置消息級別
severity = 'info' #作為例子直接將消息的級別設(shè)置為info
# severity = 'warning'
message = 'Hello World'
#使用exchang的direct模式時,routing_key的值為消息的級別
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 命令行啟動時以參數(shù)的形式傳入要接收哪個級別的消息,可以傳入多個級別
# severities = sys.argv[1:]
# 演示使用,實際運用應(yīng)該用上面的方式指明消息級別
# 作為演示,直接設(shè)置兩個接收級別,info 和 warning
severities = ['info', 'warning']
if not severities:
"""如果要接收消息的級別不存在則提示用戶輸入級別并退出程序"""
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
"""依次為每個消息級別綁定queue"""
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
注1:exchange_type=direct時,rabbitmq按消息級別發(fā)送和接收消息,接收端只能接收被指明級別的消息,其他消息,即時是由同一個發(fā)送端發(fā)送的也無法接收。當在接收端傳入多個消息級別時,應(yīng)逐個綁定消息隊列。
注2:exchange_type=direct時,同樣是廣播模式,也就是如果給多個接收端指定相同的消息級別,它們都可以同時收到這一級別的消息。
例三(exchange的類型為topic):
當exchange的類型為topic時,在發(fā)送消息時,應(yīng)指明消息消息的類型(比如mysql.log、qq.info等),我們可以在接收端指定接收消息類型的關(guān)鍵字(即按關(guān)鍵字接收,在類型為topic時,這個關(guān)鍵字可以是一個表達式)。

import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic')
# 以命令行的方式啟動發(fā)送端,以參數(shù)的形式傳入發(fā)送消息類型的關(guān)鍵字
routing_key = sys.argv[1] if len(sys.argv[1]) > 2 else 'anonymous.info'
# routing_key = 'anonymous.info'
# routing_key = 'abc.orange.abc'
# routing_key = 'abc.abc.rabbit'
# routing_key = 'lazy.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
# binding_keys = '#' #接收所有的消息
# binding_keys = ['*.info'] #接收所有以".info"結(jié)尾的消息
# binding_keys = ['*.orange.*'] #接收所有含有".orange."的消息
# binding_keys = ['*.*.rabbit', 'lazy.*'] #接收所有含有兩個擴展名且結(jié)尾是".rabbit"和所有以"lazy."開頭的消息
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
注:當exchange的類型為topic時,發(fā)送端與接收端的代碼都跟類型為direct時很像(基本只是變一個類型,如果接收消息類型的指定不用表達式,它們幾乎一樣),但是topic的應(yīng)用場景更廣。
注:rabbitmq指定消息的類型的表達式其實很簡單:
'#':代表接收所有的消息(一般單獨使用),使用它相當于exchang的類型為fanout。
'*':代表任意一個字符(一般與其他單詞配合使用)。
不使用'#'或'*',使用它相當于exchang的類型為direct。
前面介紹的都是一端發(fā)送,一端接收的消息傳遞模式,那么rabbitmq該如何實現(xiàn)客戶端和服務(wù)端都要發(fā)送和接收(即RPC)呢?
我們先來簡單了解以下RPC,RPC(Remote Procedure Call)采用客戶機/服務(wù)器模式。請求程序就是一個客戶機,而服務(wù)提供程序就是一個服務(wù)器。首先,客戶機調(diào)用進程發(fā)送一個有進程參數(shù)的調(diào)用信息到服務(wù)進程,然后等待應(yīng)答信息。在服務(wù)器端,進程保持睡眠狀態(tài)直到調(diào)用信息到達為止。當一個調(diào)用信息到達,服務(wù)器獲得進程參數(shù),計算結(jié)果,發(fā)送答復(fù)信息,然后等待下一個調(diào)用信息,最后,客戶端調(diào)用進程接收答復(fù)信息,獲得進程結(jié)果,然后調(diào)用執(zhí)行繼續(xù)進行。
例五(通過rabbitmq實現(xiàn)rpc):
先來看以下在rabbitmq中rpc的消息傳遞模式:

我們以客戶端發(fā)送一個數(shù)字n,服務(wù)端計算出斐波那契數(shù)列的第n個數(shù)的值返回給客戶端為例。
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
"""
計算斐波那契數(shù)列中第n個數(shù)的值
:param n:
:return:
"""
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to, # 使用客戶端傳來的隊列向客戶端發(fā)送消息的處理結(jié)果
properties=pika.BasicProperties(
correlation_id = props.correlation_id), # 指明處理消息的id 用于客戶端確認
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag) # 未申明no_ack = True, 消息處理完畢需向rabbitmq確認
channel.basic_qos(prefetch_count=1) # 每次只處理一條消息
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming() # 開始接收消息,未收到消息處于阻塞狀態(tài)
注1:測試時,先運行rpc_server.py,再運行rpc_client.py。
注2:客戶端之所以每隔一秒檢測一次服務(wù)端有沒有返回結(jié)果,是因為客戶端接收時時無阻塞的,在這一端時間內(nèi)(不一定是1秒,但執(zhí)行的任務(wù)消耗的時間不要太長)客戶端可以執(zhí)行其他任務(wù)提高效率。
注3:為什么客戶端和服務(wù)端不使用一個隊列來傳遞消息? 答:如果使用一個隊列,以客戶端為例,它一邊在檢測這個隊列中有沒有它要接收的消息,一邊又往這個隊列里發(fā)送消息,會形成死循環(huán)。
(PS:本文例中出現(xiàn)的所有代碼是做了一些簡單修改(方便讀者理解)后的rabbitmq官方文檔中的代碼。)
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
python中字符串比較使用is、==和cmp()總結(jié)
在Python中比較字符串最好是使用簡單邏輯操作符,今天為大家講解一下is、==和cmp()使用總結(jié)2018-03-03
人工神經(jīng)網(wǎng)絡(luò)算法知識點總結(jié)
在本篇內(nèi)容里小編給大家分享了人工神經(jīng)網(wǎng)絡(luò)算法的相關(guān)知識點以及原理介紹,需要的朋友們參考下。2019-06-06
在Tensorflow中實現(xiàn)梯度下降法更新參數(shù)值
今天小編就為大家分享一篇在Tensorflow中實現(xiàn)梯度下降法更新參數(shù)值,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-01-01
Python數(shù)據(jù)結(jié)構(gòu)之雙向鏈表詳解
單鏈表只有一個指向直接后繼的指針來表示結(jié)點間的邏輯關(guān)系,可以方便的從任一結(jié)點開始查找其后繼結(jié)點,但要找前驅(qū)結(jié)點則比較困難,雙向鏈表是為了解決這一問題,使用兩個指針表示結(jié)點間的邏輯關(guān)系。本文將重點為大家介紹雙向鏈表的相關(guān)操作,需要的可以參考一下2022-01-01
python學(xué)習(xí)手冊中的python多態(tài)示例代碼
多態(tài)是面向?qū)ο笳Z言的一個基本特性,多態(tài)意味著變量并不知道引用的對象是什么,根據(jù)引用對象的不同表現(xiàn)不同的行為方式,下面使用一個示例學(xué)習(xí)他的使用方法2014-01-01

