python實現(xiàn)RabbitMQ的消息隊列的示例代碼
最近在研究redis做消息隊列時,順便看了一下RabbitMQ做消息隊列的實現(xiàn)。以下是總結的RabbitMQ中三種exchange模式的實現(xiàn),分別是fanout, direct和topic。
base.py:
import pika
# 獲取認證對象,參數(shù)是用戶名、密碼。遠程連接時需要認證
credentials = pika.PlainCredentials("admin", "admin")
# BlockingConnection(): 實例化連接對象
# ConnectionParameters(): 實例化鏈接參數(shù)對象
connection = pika.BlockingConnection(pika.ConnectionParameters(
"192.168.0.102", 5672, "/", credentials))
# 創(chuàng)建新的channel(通道)
channel = connection.channel()
fanout模式:向綁定到指定exchange的queue中發(fā)送消息,消費者從queue中取出數(shù)據(jù),類似于廣播模式、發(fā)布訂閱模式。
綁定方式: 在接收端channel.queue_bind(exchange="logs", queue=queue_name)
代碼:
publisher.py:
from base import channel, connection # 聲明exchange, 不聲明queue channel.exchange_declare(exchange="logs", exchange_type="fanout") # 廣播 message = "hello fanout" channel.basic_publish( exchange="logs", routing_key="", body=message ) connection.close()
consumer.py:
from base import channel, connection
# 聲明exchange
channel.exchange_declare(exchange="logs", exchange_type="fanout")
# 不指定queue名字, rabbitmq會隨機分配一個名字, 消息處理完成后queue會自動刪除
result = channel.queue_declare(exclusive=True)
# 獲取queue名字
queue_name = result.method.queue
# 綁定exchange和queue
channel.queue_bind(exchange="logs", queue=queue_name)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
direct模式:發(fā)送端綁定一個routing_key1, queue中綁定若干個routing_key2, 若key1與key2相等,或者key1在key2中,則消息就會發(fā)送到這個queue中,再由相應的消費者去queue中取數(shù)據(jù)。
publisher.py:
from base import channel, connection channel.exchange_declare(exchange="direct_test", exchange_type="direct") message = "hello" channel.basic_publish( exchange="direct_test", routing_key="info", # 綁定key body=message ) connection.close()
consumer01.py:
from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange="direct_test",
queue=queue_name,
# 綁定的key,與publisher中的相同
routing_key="info"
)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
consumer02.py:
from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange="direct_test",
queue=queue_name,
# 綁定的key
routing_key="error"
)
def callback(ch, method, properties, bosy):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
consumer03.py:
from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
key_list = ["info", "warning"]
for key in key_list:
channel.queue_bind(
exchange="direct_test",
queue=queue_name,
# 一個queue同時綁定多個key,有一個key滿足條件時就可以收到數(shù)據(jù)
routing_key=key
)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
執(zhí)行:
python producer.py python consumer01.py python consumer02.py python consumer03.py
結果:
consumer01.py: body:b'hello'
consumer02.py沒收到結果
consumer03.py: body:b'hello'
topic模式不是太好理解,我的理解如下:
對于發(fā)送端綁定的routing_key1,queue綁定若干個routing_key2;若routing_key1滿足任意一個routing_key2,則該消息就會通過exchange發(fā)送到這個queue中,然后由接收端從queue中取出其實就是direct模式的擴展。
綁定方式:
發(fā)送端綁定:
channel.basic_publish(
exchange="topic_logs",
routing_key=routing_key,
body=message
)
接收端綁定:
channel.queue_bind(
exchange="topic_logs",
queue=queue_name,
routing_key=binding_key
)
publisher.py:
import sys from base import channel, connection # 聲明exchange channel.exchange_declare(exchange="topic_test", exchange_type="topic") # 待發(fā)送消息 message = " ".join(sys.argv[1:]) or "hello topic" # 發(fā)布消息 channel.basic_publish( exchange="topic_test", routing_key="mysql.error", # 綁定的routing_key body=message ) connection.close()
consumer01.py:
from base import channel, connection
channel.exchange_declare(exchange="topic_test", exchange_type="topic")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange="topic_test",
queue=queue_name,
routing_key="*.error" # 綁定的routing_key
)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name,
no_ack=True
)
channel.start_consuming()
consumer02.py:
from base import channel, connection
channel.exchange_declare(exchange="topic_test", exchange_type="topic")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange="topic_test",
queue=queue_name,
routing_key="mysql.*" # 綁定的routing_key
)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name,
no_ack=True
)
channel.start_consuming()
執(zhí)行:
python publisher02.py "this is a topic test" python consumer01.py python consumer02.py
結果:
consumer01.py的結果: body:b'this is a topic test'
consumer02.py的結果: body:b'this is a topic test'
說明通過綁定相應的routing_key,兩個消費者都收到了消息
將publisher.py的routing_key改成"mysql.info"
再此執(zhí)行:
python publisher02.py "this is a topic test" python consumer01.py python consumer02.py
結果:
consumer01.py沒收到結果
consumer02.py的結果: body:b'this is a topic test'
通過這個例子我們就能明白topic的運行方式了。
參考自: http://www.dhdzp.com/article/150386.htm
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
- Python rabbitMQ如何實現(xiàn)生產消費者模式
- Python RabbitMQ實現(xiàn)簡單的進程間通信示例
- Python實現(xiàn)RabbitMQ6種消息模型的示例代碼
- Python隊列RabbitMQ 使用方法實例記錄
- Python操作rabbitMQ的示例代碼
- python RabbitMQ 使用詳細介紹(小結)
- Python RabbitMQ消息隊列實現(xiàn)rpc
- Python+Pika+RabbitMQ環(huán)境部署及實現(xiàn)工作隊列的實例教程
- 利用Python學習RabbitMQ消息隊列
- 基于python實現(xiàn)監(jiān)聽Rabbitmq系統(tǒng)日志代碼示例
相關文章
Python中識別圖片/滑塊驗證碼準確率極高的ddddocr庫詳解
驗證碼的種類有很多,它是常用的一種反爬手段,包括:圖片驗證碼,滑塊驗證碼,等一些常見的驗證碼場景。這里推薦一個簡單實用的識別驗證碼的庫?ddddocr?(帶帶弟弟ocr)庫,希望大家喜歡2023-02-02
Python 訪問限制 private public的詳細介紹
在一個模塊中,我們可能會定義很多函數(shù)和變量。這篇文章主要介紹了Python 訪問限制 private public的詳細介紹,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-10-10
Python urllib模塊urlopen()與urlretrieve()詳解
Python urllib模塊urlopen()與urlretrieve()的使用方法詳解。2013-11-11
Python如何用filter函數(shù)篩選數(shù)據(jù)
這篇文章主要介紹了Python如何用filter函數(shù)篩選數(shù)據(jù),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-03-03
用python + hadoop streaming 分布式編程(一) -- 原理介紹,樣例程序與本地調試
Hadoop 是一個實現(xiàn)了 MapReduce 計算模型的開源分布式并行編程框架,借助于 Hadoop, 程序員可以輕松地編寫分布式并行程序,將其運行于計算機集群上,完成海量數(shù)據(jù)的計算。2014-07-07

