利用Python操作消息隊列RabbitMQ的方法教程
前言
RabbitMQ是一個在AMQP基礎(chǔ)上完整的,可復(fù)用的企業(yè)消息系統(tǒng)。他遵循Mozilla Public License開源協(xié)議。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應(yīng)用程序?qū)?yīng)用程序的通信方法。應(yīng)用程序通過讀寫出入隊列的消息(針對應(yīng)用程序的數(shù)據(jù))來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發(fā)送數(shù)據(jù)進行通信,而不是通過直接調(diào)用彼此來通信,直接調(diào)用通常是用于諸如遠程過程調(diào)用的技術(shù)。排隊指的是應(yīng)用程序通過 隊列來通信。隊列的使用除去了接收和發(fā)送應(yīng)用程序同時執(zhí)行的要求。
應(yīng)用場景:
RabbitMQ無疑是目前最流行的消息隊列之一,對各種語言環(huán)境的支持也很豐富,作為一個.NET developer有必要學(xué)習(xí)和了解這一工具。消息隊列的使用場景大概有3種:
1、系統(tǒng)集成,分布式系統(tǒng)的設(shè)計。各種子系統(tǒng)通過消息來對接,這種解決方案也逐步發(fā)展成一種架構(gòu)風(fēng)格,即“通過消息傳遞的架構(gòu)”。
2、當(dāng)系統(tǒng)中的同步處理方式嚴(yán)重影響了吞吐量,比如日志記錄。假如需要記錄系統(tǒng)中所有的用戶行為日志,如果通過同步的方式記錄日志勢必會影響系統(tǒng)的響應(yīng)速度,當(dāng)我們將日志消息發(fā)送到消息隊列,記錄日志的子系統(tǒng)就會通過異步的方式去消費日志消息。
3、系統(tǒng)的高可用性,比如電商的秒殺場景。當(dāng)某一時刻應(yīng)用服務(wù)器或數(shù)據(jù)庫服務(wù)器收到大量請求,將會出現(xiàn)系統(tǒng)宕機。如果能夠?qū)⒄埱筠D(zhuǎn)發(fā)到消息隊列,再由服務(wù)器去消費這些消息將會使得請求變得平穩(wěn),提高系統(tǒng)的可用性。
一、安裝環(huán)境
首先是在 Linux 上安裝 rabbitmq
# 環(huán)境為CentOS 7 yum install rabbitmq-server # 安裝RabbitMQ systemctl start rabbitmq-server # 啟動 systemctl enable rabbitmq-server # 開機自啟 systemctl stop firewall-cmd # 臨時關(guān)閉防火墻
然后用 pip 安裝 Python3 的開發(fā)包
pip3 install pika
安裝好軟件之后可以訪問http://115.xx.xx.xx:15672/來訪問自帶的 web 頁面來查看和管理 RabbitMQ。默認(rèn)管理員的用戶密碼都是guest
二、簡單的向隊列中加入消息
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:25
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Producer
import pika
# 創(chuàng)建連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# 創(chuàng)建頻道對象
channel = connection.channel()
# 指定一個隊列,如果該隊列不存在則創(chuàng)建
channel.queue_declare(queue='test_queue')
# 提交消息
for i in range(10):
channel.basic_publish(exchange='', routing_key='test_queue', body='hello,world' + str(i))
print("sent...")
# 關(guān)閉連接
connection.close()
三、簡單的從隊列中獲取消息
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:40
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Consumer
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# 連接到RabbitMQ服務(wù)器
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# 指定一個隊列,如果該隊列不存在則創(chuàng)建
channel.queue_declare(queue='test_queue')
# 定義一個回調(diào)函數(shù)
def callback(ch, method, properties, body):
print(body.decode('utf-8'))
# 告訴RabbitMQ使用callback來接收信息
channel.basic_consume(callback, queue='test_queue', no_ack=False)
print('waiting...')
# 開始接收信息,并進入阻塞狀態(tài),隊列里有信息才會調(diào)用callback進行處理。按ctrl+c退出。
channel.start_consuming()
四、萬一消費者掉線了
想象這樣一種情況:
消費者從消息隊列中獲取了 n 條數(shù)據(jù),正要處理呢結(jié)果宕機了,那該怎么辦?在 RabbieMQ 中有一個 ACK 可以用來確認(rèn)消費者處理結(jié)束。就有點類似網(wǎng)絡(luò)中的 ACK,消費者每次從隊列中獲取了數(shù)據(jù)之后隊列不會立刻將數(shù)據(jù)移除,而是等待對應(yīng)的 ACK。消費者獲取到數(shù)據(jù)并處理完成之后會向隊列發(fā)送一個 ACK 包,通知 RabbitMQ 這堆消息已經(jīng)處理妥當(dāng)了,可以刪除了,這時候 RabbitMQ 才會將數(shù)據(jù)從隊列中移除。所以這種情況下即使消費者掉線也沒有什么問題,數(shù)據(jù)依舊會在隊列中存在,留給其他消費者處理。
在 Python 中這樣實現(xiàn):
消費者有這樣一行代碼channel.basic_consume(callback, queue='test_queue', no_ack=False) ,其中no_ack=False表示不發(fā)送確認(rèn)包。將其修改為no_ack=True就會在每次處理完之后向 RabbitMQ 發(fā)送一個確認(rèn)包,以確認(rèn)消息處理完畢。
五、萬一 RabbitMQ 宕機了呢
雖然有了 ACK 包,但是萬一 RabbitMQ 掛了那數(shù)據(jù)還是會損失。所以我們可以給 RabbitMQ 設(shè)置一個數(shù)據(jù)持久化存儲。RabbitMQ 會將數(shù)據(jù)持久化存儲到磁盤上,保證下次再啟動的時候隊列還在。
在 Python 中這樣實現(xiàn):
我們聲明一個隊列是這樣的channel.queue_declare(queue='test_queue') ,如果需要持久化一個隊列可以這樣聲明channel.queue_declare(queue='test_queue', durable=True) 。不過這行直接放在代碼中是不能執(zhí)行的,因為以前已經(jīng)有了一個名為test_queue的隊列,RabbitMQ 不允許用不同的方式聲明同一個隊列,所以可以換一個隊列名新建來指定數(shù)據(jù)持久化存儲。不過如果只是這樣聲明的話,在 RabbitMQ 宕機重啟后確實隊列還在,不過隊列里的數(shù)據(jù)就沒有了。除非我們這樣來聲明隊列channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,)) 。
六、最簡單的發(fā)布訂閱
最簡單的發(fā)布訂閱在 RabbitMQ 中稱之為Fanout模式。也就是說訂閱者訂閱某個頻道,然后發(fā)布者向這個頻道中發(fā)布消息,所有訂閱者就都能接收到這條消息。不過因為發(fā)布者需要使用訂閱者創(chuàng)建的隨機隊列所以需要先啟動訂閱者才能啟動發(fā)布者。
發(fā)布者代碼:
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:21
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Publisher
import pika
# 創(chuàng)建連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# 創(chuàng)建頻道對象
channel = connection.channel()
# 定義交換機,exchange表示交換機名稱,type表示類型
channel.exchange_declare(exchange='my_fanout',
type='fanout')
message = 'Hello Python'
# 將消息發(fā)送到交換機
channel.basic_publish(exchange='my_fanout', # 指定exchange
routing_key='', # fanout下不需要配置,配置了也不會生效
body=message)
connection.close()
訂閱者代碼:
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:20
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Subscriber
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# 連接到RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# 定義交換機,進行exchange聲明,exchange表示交換機名稱,type表示類型
channel.exchange_declare(exchange='my_fanout',
type='fanout')
# 隨機創(chuàng)建隊列
result = channel.queue_declare(exclusive=True) # exclusive=True表示建立臨時隊列,當(dāng)consumer關(guān)閉后,該隊列就會被刪除
queue_name = result.method.queue
# 將隊列與exchange進行綁定
channel.queue_bind(exchange='my_fanout',
queue=queue_name)
# 定義回調(diào)方法
def callback(ch, method, properties, body):
print(body.decode('utf-8'))
# 從隊列獲取信息
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
總結(jié)
以上就是這篇文章的全部內(nèi)容,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。
相關(guān)文章
tkinter如何實現(xiàn)label超鏈接調(diào)用瀏覽器打開網(wǎng)址
這篇文章主要介紹了tkinter如何實現(xiàn)label超鏈接調(diào)用瀏覽器打開網(wǎng)址問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01
Python數(shù)據(jù)類型-序列sequence
這篇文章主要介紹了Python數(shù)據(jù)類型-序列sequence,在前面,我們已經(jīng)對Python學(xué)習(xí)做了系統(tǒng)的知識梳理(Python思維導(dǎo)圖),我們接下來把知識點分節(jié)進行細講。這一節(jié),我們講解序列,需要的朋友可以參考下2022-01-01
python如何實現(xiàn)讀取并顯示圖片(不需要圖形界面)
這篇文章主要介紹了python如何實現(xiàn)讀取并顯示圖片,文中示例代碼非常詳細,幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下2020-07-07
python 中的list和array的不同之處及轉(zhuǎn)換問題
python中的list是python的內(nèi)置數(shù)據(jù)類型,list中的數(shù)據(jù)類不必相同的,而array的中的類型必須全部相同。這篇文章給大家介紹了python 中的list和array的不同之處及轉(zhuǎn)換問題,需要的朋友參考下吧2018-03-03
Python使用openpyxl實現(xiàn)Excel超鏈接批量化設(shè)置
在Excel中,超鏈接是一種非常有用的功能,本文我們將介紹如何使用Python來處理Excel中的超鏈接,以及如何將超鏈接與對應(yīng)的工作表鏈接起來,需要的可以參考一下2023-07-07
python中小數(shù)點后取2位(四舍五入)及取2位(四舍五不入)的方法
這篇文章主要給大家介紹了python中小數(shù)點后取2位(四舍五入)及取2位(四舍五不入)的方法,在Python中取兩位小數(shù)的方法其實非常簡單,需要的朋友可以參考下2023-08-08
Python中判斷輸入是否為數(shù)字的實現(xiàn)代碼
這篇文章主要介紹了Python中判斷輸入是否為數(shù)字的實現(xiàn)代碼,需要的朋友可以參考下2018-05-05

