python使用pika庫調(diào)用rabbitmq交換機模式詳解
前言:
交換機模式主要包括:交換機之發(fā)布訂閱、交換機之關(guān)鍵字和交換機之通配符。
1、交換機之發(fā)布訂閱

發(fā)布訂閱和簡單的消息隊列區(qū)別在于,發(fā)布訂閱會將消息發(fā)送給所有的訂閱者,而消息隊列中的數(shù)據(jù)被消費一次便消失。所以,RabbitMQ實現(xiàn)發(fā)布和訂閱時,會為每一個訂閱者創(chuàng)建一個隊列,而發(fā)布者發(fā)布消息時,會將消息放置在所有相關(guān)隊列中。
生產(chǎn)者模式:
示例代碼:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.聲明一個名為logs類型的fanout的交換機
channel.exchange_declare(exchange='logs', exchange_type='fanout') # 發(fā)布訂閱模式參數(shù)
# 3.向logs交換機中插入數(shù)據(jù):"Hello world"
message = 'info:Hello World!'
channel.basic_publish(exchange='logs',
routing_key='',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" [x] Sent 'Hello World!'")運行結(jié)果:

消費者模式:
示例代碼:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.聲明一個名為logs類型的fanout的交換機
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 3.創(chuàng)建隊列
result = channel.queue_declare("", exclusive=True) # 隨機生成一個隊列名
queue_name = result.method.queue
print(queue_name)
# 4.將指定隊列綁定到交換機上
channel.queue_bind(exchange='logs', queue=queue_name)
# 5.確定回調(diào)函數(shù)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.確定監(jiān)聽隊列參數(shù)
channel.basic_consume(queue=queue_name, # 指定隊列
auto_ack=False, # 手動應(yīng)答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式監(jiān)聽
channel.start_consuming()運行結(jié)果:【將程序重復(fù)執(zhí)行三次,三個消費者都收到了同樣的消息】



2、交換機之關(guān)鍵字

生產(chǎn)者模式:
示例代碼: 【將info分別改為warning、error運行】
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.聲明一個名為logs類型的direct的交換機
channel.exchange_declare(exchange='logs2', exchange_type='direct') # 發(fā)布訂閱模式參數(shù)
# 3.向logs交換機中插入數(shù)據(jù):"Hello world"
message = 'info:Hello World!'
channel.basic_publish(exchange='logs2',
routing_key='info', # info信息
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" [x] Sent 'Hello World!'")運行結(jié)果:

消費者模式:
示例代碼1:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.聲明一個名為logs類型的direct的交換機
channel.exchange_declare(exchange='logs2', exchange_type='direct')
# 3.創(chuàng)建隊列
result = channel.queue_declare("", exclusive=True) # 隨機生成一個隊列名
queue_name = result.method.queue
print(queue_name)
# 4.將指定隊列綁定到交換機上
channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='info')
channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='waring')
channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='error')
# # 使用for循環(huán)將指定隊列綁定到交換機上
# for key in ['info', 'waring', 'error']:
# channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=key)
# 5.確定回調(diào)函數(shù)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.確定監(jiān)聽隊列參數(shù)
channel.basic_consume(queue=queue_name, # 指定隊列
auto_ack=False, # 手動應(yīng)答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式監(jiān)聽
channel.start_consuming()運行結(jié)果:

示例代碼2:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.聲明一個名為logs類型的direct的交換機
channel.exchange_declare(exchange='logs2', exchange_type='direct')
# 3.創(chuàng)建隊列
result = channel.queue_declare("", exclusive=True) # 隨機生成一個隊列名
queue_name = result.method.queue
print(queue_name)
# 4.將指定隊列綁定到交換機上
channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='info')
# # 使用for循環(huán)將指定隊列綁定到交換機上
# for key in ['info', 'waring', 'error']:
# channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=key)
# 5.確定回調(diào)函數(shù)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.確定監(jiān)聽隊列參數(shù)
channel.basic_consume(queue=queue_name, # 指定隊列
auto_ack=False, # 手動應(yīng)答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式監(jiān)聽
channel.start_consuming()運行結(jié)果:

示例代碼3:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.聲明一個名為logs類型的direct的交換機
channel.exchange_declare(exchange='logs2', exchange_type='direct')
# 3.創(chuàng)建隊列
result = channel.queue_declare("", exclusive=True) # 隨機生成一個隊列名
queue_name = result.method.queue
print(queue_name)
# 4.將指定隊列綁定到交換機上
channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='error')
# # 使用for循環(huán)將指定隊列綁定到交換機上
# for key in ['info', 'waring', 'error']:
# channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=key)
# 5.確定回調(diào)函數(shù)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.確定監(jiān)聽隊列參數(shù)
channel.basic_consume(queue=queue_name, # 指定隊列
auto_ack=False, # 手動應(yīng)答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式監(jiān)聽
channel.start_consuming()運行結(jié)果:

3、交換機之通配符
通配符交換機”與之前的路由模式相比,它將信息的傳輸類型的key更加細(xì)化,以“key1.key2.keyN....”的模式來指定信息傳輸?shù)膋ey的大類型和大類型下面的小類型,讓消費者可以更加精細(xì)的確認(rèn)自己想要獲取的信息類型。而在消費者一段,不用精確的指定具體到哪一個大類型下的小類型的key,而是可以使用類似正則表達式(但與正則表達式規(guī)則完全不同)的通配符在指定一定范圍或符合某一個字符串匹配規(guī)則的key,來獲取想要的信息。
“通配符交換機”(Topic Exchange)將路由鍵和某模式進行匹配。此時隊列需要綁定在一個模式上。符號“#”匹配一個或多個詞,符號“*”僅匹配一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*”只會匹配到“audit.irs”。(這里與一般的正則表達式的“*”和“#”剛好相反,這里我們需要注意一下。)

生產(chǎn)者模式:
示例代碼: 【分別將routing_key改為usa.news、news.usa和usa.weather執(zhí)行一遍】
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.聲明一個名為logs類型的topic的交換機
channel.exchange_declare(exchange='logs3', exchange_type='topic') # 發(fā)布訂閱模式參數(shù)
# 3.向logs交換機中插入數(shù)據(jù):"Hello world"
message = 'usa.news---------'
channel.basic_publish(exchange='logs3',
routing_key='usa.news', # usa.news
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" [x] Sent 'Hello World!'")運行結(jié)果:

消費者模式:
示例代碼1:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.聲明一個名為logs類型的topic的交換機
channel.exchange_declare(exchange='logs3', exchange_type='topic')
# 3.創(chuàng)建隊列
result = channel.queue_declare("", exclusive=True) # 隨機生成一個隊列名
queue_name = result.method.queue
print(queue_name)
# 4.將指定隊列綁定到交換機上
channel.queue_bind(exchange='logs3', queue=queue_name, routing_key='news.#')
# # 使用for循環(huán)將指定隊列綁定到交換機上
# for key in ['info.#', 'waring.#', 'error.#']:
# channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=key)
# 5.確定回調(diào)函數(shù)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.確定監(jiān)聽隊列參數(shù)
channel.basic_consume(queue=queue_name, # 指定隊列
auto_ack=False, # 手動應(yīng)答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式監(jiān)聽
channel.start_consuming()運行結(jié)果:

示例代碼2:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.聲明一個名為logs類型的topic的交換機
channel.exchange_declare(exchange='logs3', exchange_type='topic')
# 3.創(chuàng)建隊列
result = channel.queue_declare("", exclusive=True) # 隨機生成一個隊列名
queue_name = result.method.queue
print(queue_name)
# 4.將指定隊列綁定到交換機上
channel.queue_bind(exchange='logs3', queue=queue_name, routing_key='#.news')
# # 使用for循環(huán)將指定隊列綁定到交換機上
# for key in ['info.#', 'waring.#', 'error.#']:
# channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=key)
# 5.確定回調(diào)函數(shù)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.確定監(jiān)聽隊列參數(shù)
channel.basic_consume(queue=queue_name, # 指定隊列
auto_ack=False, # 手動應(yīng)答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式監(jiān)聽
channel.start_consuming()運行結(jié)果:

示例代碼3:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.聲明一個名為logs類型的topic的交換機
channel.exchange_declare(exchange='logs3', exchange_type='topic')
# 3.創(chuàng)建隊列
result = channel.queue_declare("", exclusive=True) # 隨機生成一個隊列名
queue_name = result.method.queue
print(queue_name)
# 4.將指定隊列綁定到交換機上
channel.queue_bind(exchange='logs3', queue=queue_name, routing_key='#.weather')
# # 使用for循環(huán)將指定隊列綁定到交換機上
# for key in ['info.#', 'waring.#', 'error.#']:
# channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=key)
# 5.確定回調(diào)函數(shù)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.確定監(jiān)聽隊列參數(shù)
channel.basic_consume(queue=queue_name, # 指定隊列
auto_ack=False, # 手動應(yīng)答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式監(jiān)聽
channel.start_consuming()運行結(jié)果:

到此這篇關(guān)于python使用pika庫調(diào)用rabbitmq交換機模式詳解的文章就介紹到這了,更多相關(guān)python rabbitmq交換機模式內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python常用數(shù)據(jù)結(jié)構(gòu)集合詳解
這篇文章主要介紹了python常用數(shù)據(jù)結(jié)構(gòu)集合詳解,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,感興趣的小伙伴可以參考一下,希望對你的學(xué)習(xí)有所幫助2022-08-08
一次性徹底講透Python中pd.concat與pd.merge
本文主要介紹了一次性徹底講透Python中pd.concat與pd.merge,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06
Python面向?qū)ο笾蓄悾╟lass)的簡單理解與用法分析
這篇文章主要介紹了Python面向?qū)ο笾蓄悾╟lass)的簡單理解與用法,結(jié)合實例形式分析了Python面向?qū)ο蟪绦蛟O(shè)計中類(class)的基本概念、原理、定義與使用方法,需要的朋友可以參考下2020-02-02
Tensorflow2.4使用Tuner選擇模型最佳超參詳解
這篇文章主要介紹了Tensorflow2.4使用Tuner選擇模型最佳超參詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-11-11
python字符串編碼識別模塊chardet簡單應(yīng)用
有時候需要先檢測一個文件的編碼,然后將其轉(zhuǎn)化為另一種編碼。這時候就會用到chardet(chardet是python的一個第三方庫,是非常優(yōu)秀的編碼識別模塊)2015-06-06

