Python操作RabbitMQ服務器實現(xiàn)消息隊列的路由功能
Python使用Pika庫(安裝:sudo pip install pika)可以操作RabbitMQ消息隊列服務器(安裝:sudo apt-get install rabbitmq-server),這里我們來看一下MQ相關的路由功能。
路由鍵的實現(xiàn)
比如有一個需要給所有接收端發(fā)送消息的場景,但是如果需要自由定制,有的消息發(fā)給其中一些接收端,有些消息發(fā)送給另外一些接收端,要怎么辦呢?這種情況下就要用到路由鍵了。
路由鍵的工作原理:每個接收端的消息隊列在綁定交換機的時候,可以設定相應的路由鍵。發(fā)送端通過交換機發(fā)送信息時,可以指明路由鍵 ,交換機會根據路由鍵把消息發(fā)送到相應的消息隊列,這樣接收端就能接收到消息了。
這邊繼上一篇,還是用send.py和receive.py來模擬實現(xiàn)路由鍵的功能。send.py表示發(fā)送端,receive.py表示接收端。實例的功能就是將info、warning、error三種級別的信息發(fā)送到不同的接收端。
send.py代碼分析
#!/usr/bin/env python
#coding=utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
#定義交換機,設置類型為direct
channel.exchange_declare(exchange='messages', type='direct')
#定義三個路由鍵
routings = ['info', 'warning', 'error']
#將消息依次發(fā)送到交換機,并設置路由鍵
for routing in routings:
message = '%s message.' % routing
channel.basic_publish(exchange='messages',
routing_key=routing,
body=message)
print message
connection.close()
receive.py代碼分析
#!/usr/bin/env python
#coding=utf8
import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
#定義交換機,設置類型為direct
channel.exchange_declare(exchange='messages', type='direct')
#從命令行獲取路由鍵參數,如果沒有,則設置為info
routings = sys.argv[1:]
if not routings:
routings = ['info']
#生成臨時隊列,并綁定到交換機上,設置路由鍵
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for routing in routings:
channel.queue_bind(exchange='messages',
queue=queue_name,
routing_key=routing)
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
打開兩個終端,一個運行代碼python receive.py info warning,表示只接收info和warning的消息。另外一個終端運行send.py,可以觀察到接收終端只接收到了info和warning的消息。如果打開多個終端運行receive.py,并傳入不同的路由鍵參數,可以看到更明顯的效果。
當接收端正在運行時,可以使用rabbitmqctl list_bindings來查看綁定情況。
路由鍵模糊匹配
路由鍵模糊匹配,就是可以使用正則表達式,和常用的正則表示式不同,這里的話“#”表示所有、全部的意思;“*”只匹配到一個詞??赐晔纠湍苊靼琢?。
這邊繼上面的例子,還是用send.py和receive.py來實現(xiàn)路由鍵模糊匹配的功能。send.py表示發(fā)送端,receive.py表示接收端。實例的功能大概是這樣:比如你有個知心好朋友,不管開心、傷心、工作上的還是生活上的事情都可以和她說;還有一些朋友可以分享開心的事情;還有一些朋友,你可以把不開心的事情和她說。
send.py代碼分析
因為要進行路由鍵模糊匹配,所以交換機的類型要設置為topic,設置為topic,就可以使用#,*的匹配符號了。
#!/usr/bin/env python
#coding=utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
#定義交換機,設置類型為topic
channel.exchange_declare(exchange='messages', type='topic')
#定義路由鍵
routings = ['happy.work', 'happy.life', 'sad.work', 'sad.life']
#將消息依次發(fā)送到交換機,并設定路由鍵
for routing in routings:
message = '%s message.' % routing
channel.basic_publish(exchange='messages',
routing_key=routing,
body=message)
print message
connection.close()
上例中定義了四種類型的消息,容易理解,就不解釋了,然后依次發(fā)送出去。
receive.py代碼分析
同樣,交換機的類型要設定為topic就可以了。從命令行接收參數的功能稍微調整了一下,就是沒有參數時報錯退出。
#!/usr/bin/env python
#coding=utf8
import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
#定義交換機,設置類型為topic
channel.exchange_declare(exchange='messages', type='topic')
#從命令行獲取路由參數,如果沒有,則報錯退出
routings = sys.argv[1:]
if not routings:
print >> sys.stderr, "Usage: %s [routing_key]..." % (sys.argv[0],)
exit()
#生成臨時隊列,并綁定到交換機上,設置路由鍵
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for routing in routings:
channel.queue_bind(exchange='messages',
queue=queue_name,
routing_key=routing)
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
打開四個終端,一個運行如下,表示任何事情都可以和她說:
python receive.py "#"
另外一個終端 運行如下,表示可以和她分享開心的事:
python receive.py "happy.*"
第三個運行如下,表示工作上的事情可以和她分享:
python receive.py "*.work"
最后一個運行python send.py。結果不難想象出來,就不貼出來了。
- Python進程間通信Queue消息隊列用法分析
- 利用Python學習RabbitMQ消息隊列
- Python中線程的MQ消息隊列實現(xiàn)以及消息隊列的優(yōu)點解析
- 詳解Python操作RabbitMQ服務器消息隊列的遠程結果返回
- Python的消息隊列包SnakeMQ使用初探
- 利用Python操作消息隊列RabbitMQ的方法教程
- Python RabbitMQ消息隊列實現(xiàn)rpc
- python實現(xiàn)RabbitMQ的消息隊列的示例代碼
- Python多進程庫multiprocessing中進程池Pool類的使用詳解
- Python 多進程并發(fā)操作中進程池Pool的實例
- Python多進程池 multiprocessing Pool用法示例
- Python高級編程之消息隊列(Queue)與進程池(Pool)實例詳解
相關文章
Python 相對路徑報錯:"No such file or 
如果你取相對路徑不是在主文件里,可能就會有相對路徑問題:"No such file or directory",由于python 的相對路徑,相對的都是主文件所以會出現(xiàn)Python 相對路徑報錯,今天小編給大家?guī)砹送昝澜鉀Q方案,感興趣的朋友一起看看吧2023-02-02
python3解析庫BeautifulSoup4的安裝配置與基本用法
簡單來說,BeautifulSoup就是Python的一個HTML或XML的解析庫,我們可以用它來方便地從網頁中提取數據,下面這篇文章主要給大家介紹了關于python3解析庫BeautifulSoup4的安裝配置與基本用法的相關資料,需要的朋友可以參考下2018-06-06
Python使用MapReduce編程模型統(tǒng)計銷量
MapReduce是面向大數據并行處理的計算模型、框架和平臺,是一種計算引擎,可以把我們對大批量數據的計算通過抽象成map與reduce兩個子任務進行計算從而更快的得到想要的結果2022-04-04

