python操作RabbitMq的三種工作模式
一、簡(jiǎn)介:
RabbitMq 是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理中間件。消息隊(duì)列是一種應(yīng)用程序?qū)?yīng)用程序的通行方式,應(yīng)用程序通過寫消息,將消息傳遞于隊(duì)列,由另一應(yīng)用程序讀取 完成通信。而作為中間件的 RabbitMq 無疑是目前最流行的消息隊(duì)列之一。
RabbitMq 應(yīng)用場(chǎng)景廣泛:
- 系統(tǒng)的高可用:日常生活當(dāng)中各種商城秒殺,高流量,高并發(fā)的場(chǎng)景。當(dāng)服務(wù)器接收到如此大量請(qǐng)求處理業(yè)務(wù)時(shí),有宕機(jī)的風(fēng)險(xiǎn)。某些業(yè)務(wù)可能極其復(fù)雜,但這部分不是高時(shí)效性,不需要立即反饋給用戶,我們可以將這部分處理請(qǐng)求拋給隊(duì)列,讓程序后置去處理,減輕服務(wù)器在高并發(fā)場(chǎng)景下的壓力。
- 分布式系統(tǒng),集成系統(tǒng),子系統(tǒng)之間的對(duì)接,以及架構(gòu)設(shè)計(jì)中常常需要考慮消息隊(duì)列的應(yīng)用。
二、RabbitMq 生產(chǎn)和消費(fèi)
生產(chǎn)者(producter):隊(duì)列消息的產(chǎn)生者,負(fù)責(zé)生產(chǎn)消息,并將消息傳入隊(duì)列
import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456') # mq用戶名和密碼
# 虛擬隊(duì)列需要指定參數(shù) virtual_host,如果是默認(rèn)的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 聲明消息隊(duì)列,消息將在這個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建
result = channel.queue_declare(queue = 'python-test')
for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})
# 向隊(duì)列插入數(shù)值 routing_key是隊(duì)列名
channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
print(message)
connection.close()
消費(fèi)者(consumer):隊(duì)列消息的接收者,負(fù)責(zé) 接收并處理 消息隊(duì)列中的消息
import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 申明消息隊(duì)列,消息在這個(gè)隊(duì)列傳遞,如果不存在,則創(chuàng)建隊(duì)列
channel.queue_declare(queue = 'python-test', durable = False)
# 定義一個(gè)回調(diào)函數(shù)來處理消息隊(duì)列中的消息,這里是打印出來
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())
# 告訴rabbitmq,用callback來接收消息
channel.basic_consume('python-test',callback)
# 開始接收信息,并進(jìn)入阻塞狀態(tài),隊(duì)列里有信息才會(huì)調(diào)用callback進(jìn)行處理
channel.start_consuming()
三、RabbitMq 持久化
MQ默認(rèn)建立的是臨時(shí) queue 和 exchange,如果不聲明持久化,一旦 rabbitmq 掛掉,queue、exchange 將會(huì)全部丟失。所以我們一般在創(chuàng)建 queue 或者 exchange 的時(shí)候會(huì)聲明 持久化。
1.queue 聲明持久化
# 聲明消息隊(duì)列,消息將在這個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建。durable = True 代表消息隊(duì)列持久化存儲(chǔ),F(xiàn)alse 非持久化存儲(chǔ) result = channel.queue_declare(queue = 'python-test',durable = True)
2.exchange 聲明持久化
# 聲明exchange,由exchange指定消息在哪個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建.durable = True 代表exchange持久化存儲(chǔ),F(xiàn)alse 非持久化存儲(chǔ) channel.exchange_declare(exchange = 'python-test', durable = True)
注意:如果已存在一個(gè)非持久化的 queue 或 exchange ,執(zhí)行上述代碼會(huì)報(bào)錯(cuò),因?yàn)楫?dāng)前狀態(tài)不能更改 queue 或 exchange 存儲(chǔ)屬性,需要?jiǎng)h除重建。如果 queue 和 exchange 中一個(gè)聲明了持久化,另一個(gè)沒有聲明持久化,則不允許綁定。
3.消息持久化
雖然 exchange 和 queue 都申明了持久化,但如果消息只存在內(nèi)存里,rabbitmq 重啟后,內(nèi)存里的東西還是會(huì)丟失。所以必須聲明消息也是持久化,從內(nèi)存轉(zhuǎn)存到硬盤。
# 向隊(duì)列插入數(shù)值 routing_key是隊(duì)列名。delivery_mode = 2 聲明消息在隊(duì)列中持久化,delivery_mod = 1 消息非持久化
channel.basic_publish(exchange = '',routing_key = 'python-test',body = message,
properties=pika.BasicProperties(delivery_mode = 2))
4.acknowledgement 消息不丟失
消費(fèi)者(consumer)調(diào)用callback函數(shù)時(shí),會(huì)存在處理消息失敗的風(fēng)險(xiǎn),如果處理失敗,則消息丟失。但是也可以選擇消費(fèi)者處理失敗時(shí),將消息回退給 rabbitmq ,重新再被消費(fèi)者消費(fèi),這個(gè)時(shí)候需要設(shè)置確認(rèn)標(biāo)識(shí)。
channel.basic_consume(callback,queue = 'python-test',
# no_ack 設(shè)置成 False,在調(diào)用callback函數(shù)時(shí),未收到確認(rèn)標(biāo)識(shí),消息會(huì)重回隊(duì)列。True,無論調(diào)用callback成功與否,消息都被消費(fèi)掉
no_ack = False)
四、RabbitMq 發(fā)布與訂閱
rabbitmq 的發(fā)布與訂閱要借助交換機(jī)(Exchange)的原理實(shí)現(xiàn):

Exchange 一共有三種工作模式:fanout, direct, topicd
模式一:fanout
這種模式下,傳遞到 exchange 的消息將會(huì)轉(zhuǎn)發(fā)到所有與其綁定的 queue 上。
- 不需要指定 routing_key ,即使指定了也是無效。
- 需要提前將 exchange 和 queue 綁定,一個(gè) exchange 可以綁定多個(gè) queue,一個(gè)queue可以綁定多個(gè)exchange。
- 需要先啟動(dòng) 訂閱者,此模式下的隊(duì)列是 consumer 隨機(jī)生成的,發(fā)布者 僅僅發(fā)布消息到 exchange ,由 exchange 轉(zhuǎn)發(fā)消息至 queue。
發(fā)布者:
import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456') # mq用戶名和密碼
# 虛擬隊(duì)列需要指定參數(shù) virtual_host,如果是默認(rèn)的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 聲明exchange,由exchange指定消息在哪個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲(chǔ),F(xiàn)alse 非持久化存儲(chǔ)
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})
# 向隊(duì)列插入數(shù)值 routing_key是隊(duì)列名。delivery_mode = 2 聲明消息在隊(duì)列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置
channel.basic_publish(exchange = 'python-test',routing_key = '',body = message,
properties=pika.BasicProperties(delivery_mode = 2))
print(message)
connection.close()
訂閱者:
import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 創(chuàng)建臨時(shí)隊(duì)列,隊(duì)列名傳空字符,consumer關(guān)閉后,隊(duì)列自動(dòng)刪除
result = channel.queue_declare('',exclusive=True)
# 聲明exchange,由exchange指定消息在哪個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲(chǔ),F(xiàn)alse 非持久化存儲(chǔ)
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
# 綁定exchange和隊(duì)列 exchange 使我們能夠確切地指定消息應(yīng)該到哪個(gè)隊(duì)列去
channel.queue_bind(exchange = 'python-test',queue = result.method.queue)
# 定義一個(gè)回調(diào)函數(shù)來處理消息隊(duì)列中的消息,這里是打印出來
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())
channel.basic_consume(result.method.queue,callback,# 設(shè)置成 False,在調(diào)用callback函數(shù)時(shí),未收到確認(rèn)標(biāo)識(shí),消息會(huì)重回隊(duì)列。True,無論調(diào)用callback成功與否,消息都被消費(fèi)掉
auto_ack = False)
channel.start_consuming()
模式二:direct
這種工作模式的原理是 消息發(fā)送至 exchange,exchange 根據(jù) 路由鍵(routing_key)轉(zhuǎn)發(fā)到相對(duì)應(yīng)的 queue 上。
- 可以使用默認(rèn) exchange =' ' ,也可以自定義 exchange
- 這種模式下不需要將 exchange 和 任何進(jìn)行綁定,當(dāng)然綁定也是可以的。可以將 exchange 和 queue ,routing_key 和 queue 進(jìn)行綁定
- 傳遞或接受消息時(shí) 需要 指定 routing_key
- 需要先啟動(dòng) 訂閱者,此模式下的隊(duì)列是 consumer 隨機(jī)生成的,發(fā)布者 僅僅發(fā)布消息到 exchange ,由 exchange 轉(zhuǎn)發(fā)消息至 queue。
發(fā)布者:
import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456') # mq用戶名和密碼
# 虛擬隊(duì)列需要指定參數(shù) virtual_host,如果是默認(rèn)的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 聲明exchange,由exchange指定消息在哪個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲(chǔ),F(xiàn)alse 非持久化存儲(chǔ)
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')
for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})
# 指定 routing_key。delivery_mode = 2 聲明消息在隊(duì)列中持久化,delivery_mod = 1 消息非持久化
channel.basic_publish(exchange = 'python-test',routing_key = 'OrderId',body = message,
properties=pika.BasicProperties(delivery_mode = 2))
print(message)
connection.close()
消費(fèi)者:
import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 創(chuàng)建臨時(shí)隊(duì)列,隊(duì)列名傳空字符,consumer關(guān)閉后,隊(duì)列自動(dòng)刪除
result = channel.queue_declare('',exclusive=True)
# 聲明exchange,由exchange指定消息在哪個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲(chǔ),F(xiàn)alse 非持久化存儲(chǔ)
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')
# 綁定exchange和隊(duì)列 exchange 使我們能夠確切地指定消息應(yīng)該到哪個(gè)隊(duì)列去
channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId')
# 定義一個(gè)回調(diào)函數(shù)來處理消息隊(duì)列中的消息,這里是打印出來
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())
#channel.basic_qos(prefetch_count=1)
# 告訴rabbitmq,用callback來接受消息
channel.basic_consume(result.method.queue,callback,
# 設(shè)置成 False,在調(diào)用callback函數(shù)時(shí),未收到確認(rèn)標(biāo)識(shí),消息會(huì)重回隊(duì)列。True,無論調(diào)用callback成功與否,消息都被消費(fèi)掉
auto_ack = False)
channel.start_consuming()
模式三:topicd
這種模式和第二種模式差不多,exchange 也是通過 路由鍵 routing_key 來轉(zhuǎn)發(fā)消息到指定的 queue 。 不同點(diǎn)是 routing_key 使用正則表達(dá)式支持模糊匹配,但匹配規(guī)則又與常規(guī)的正則表達(dá)式不同,比如“#”是匹配全部,“*”是匹配一個(gè)詞。
舉例:routing_key =“#orderid#”,意思是將消息轉(zhuǎn)發(fā)至所有 routing_key 包含 “orderid” 字符的隊(duì)列中。代碼和模式二 類似,就不貼出來了。
以上就是python操作RabbitMq的三種工作模式的詳細(xì)內(nèi)容,更多關(guān)于python操作RabbitMq工作模式的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python實(shí)現(xiàn)將通信達(dá).day文件讀取為DataFrame
今天小編就為大家分享一篇Python實(shí)現(xiàn)將通信達(dá).day文件讀取為DataFrame,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-12-12
OpenCV實(shí)現(xiàn)對(duì)象跟蹤的方法
OpenCV 是一個(gè)很好的處理圖像和視頻的工具,本文主要介紹了OpenCV 進(jìn)行對(duì)象跟蹤,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-10-10
深入理解python中sort()與sorted()的區(qū)別
Python list內(nèi)置sort()方法用來排序,也可以用python內(nèi)置的全局sorted()方法來對(duì)可迭代的序列排序生成新的序列。這篇文章主要介紹了python中sort()與sorted()的區(qū)別,需要的朋友可以參考下2018-08-08
使用Python將PDF文件轉(zhuǎn)存為圖片的代碼示例
因工作中的某些奇葩要求,需要將PDF文件的每頁內(nèi)容轉(zhuǎn)存成按順序編號(hào)的圖片,用第三方軟件或者在線轉(zhuǎn)換也可以,但批量操作還是Python方便,所以本文給大家介紹了使用Python將PDF文件轉(zhuǎn)存為圖片的方法,需要的朋友可以參考下2023-09-09
利用python讀取YUV文件 轉(zhuǎn)RGB 8bit/10bit通用
今天小編就為大家分享一篇利用python讀取YUV文件 轉(zhuǎn)RGB 8bit/10bit通用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-12-12
python?memory_profiler庫生成器和迭代器內(nèi)存占用的時(shí)間分析
這篇文章主要介紹了python?memory_profiler庫生成器和迭代器內(nèi)存占用的時(shí)間分析,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,感興趣的小伙伴可以參考一下2022-06-06
Python實(shí)現(xiàn)微信自動(dòng)回復(fù)信息的功能(根據(jù)不同信息回復(fù)對(duì)應(yīng)的信息)
這篇文章主要介紹了Python實(shí)現(xiàn)微信自動(dòng)回復(fù)信息的功能(根據(jù)不同信息回復(fù)對(duì)應(yīng)的信息),我們使用的第三方包是UIAutomation,結(jié)合示例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-09-09
python中的iterator和"lazy?iterator"區(qū)別介紹
這篇文章主要介紹了python中的iterator和?“l(fā)azy?iterator“之間有什么區(qū)別,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04
使用python實(shí)現(xiàn)將excel數(shù)據(jù)導(dǎo)入word并設(shè)置字體樣式的代碼示例
在日常辦公和數(shù)據(jù)處理中,我們經(jīng)常需要將Excel中的數(shù)據(jù)導(dǎo)入到Word文檔中,手動(dòng)完成這個(gè)過程可能既費(fèi)時(shí)又容易出錯(cuò),本文將介紹如何使用Python自動(dòng)化這一任務(wù),需要的朋友可以參考下2024-02-02

