Python RabbitMQ實現(xiàn)簡單的進程間通信示例
RabbitMQ 消息隊列
PY
threading Queue
進程Queue 父進程與子進程,或同一父進程下的多個子進程進行交互
缺點:兩個不同Python文件不能通過上面兩個Queue進行交互
erlong
基于這個語言創(chuàng)建的一種中間商
win中需要先安裝erlong才能使用
rabbitmq_server start
安裝 Python module
pip install pika
or
easy_install pika
or
源碼
rabbit 默認端口15672
查看當前時刻的隊列數(shù)
rabbitmqctl.bat list_queue
exchange
在定義的時候就是有類型的,決定到底哪些queue符合條件,可以接受消息
fanout:所有bind到此exchange的queue都可以收到消息
direct:通過routingkey和exchange決定唯一的queue可以接受消息
topic: 所有符合routingkey(此時可以是一個表達式)的routingkey所bind的queue都可以接受消息
表達式符號說明:
# 代表一個或多個字符 * 代表任何字符
RPC
remote procedure call 雙向傳輸,指令<-------->指令執(zhí)行結(jié)果
實現(xiàn)方法: 創(chuàng)建兩個隊列,一個隊列收指令,一個隊列發(fā)送執(zhí)行結(jié)果
用rabbitmq實現(xiàn)簡單的生產(chǎn)者消費者模型
1) rabbit_producer.py
# Author : Xuefeng
import pika
connection = pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
# statement a channel
channel = connection.channel()
# create the queue, the name of queue is "hello"
# durable=True can make the queue be exist, although the service have stopped before.
channel.queue_declare(queue="hello", durable=True)
# n RabbitMQ a message can never be sent directly to queue,it always need to go through
channel.basic_publish(exchange = " ",
routing_key = "hello",
body = "Hello world!",
properties = pika.BasicPropreties(
delivery_mode=2, # make the message persistence
)
)
print("[x] sent 'Hello world!'")
connection.close()
2) rabbit_consumer.py
# Author : Xuefeng
import pika
connection = pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
# statement a channel
channel = connection.channel()
channel.queue_declare(queue="hello", durable=True)
def callback(ch, method, properties, body):
'''
Handle the recieved data
:param ch: The address of the channel
:param method: Information about the connection
:param properties:
:param body:
:return:
'''
print("------>", ch, method, properties )
print("[x] Recieved %r" % body)
# ack by ourself
ch.basic_ack(delivery_tag = method.delivery_tag)
# follow is for consumer to auto change with the ability
channel.basic_qos(profetch_count=1)
# no_ack = True represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message.
queue = "hello",
no_ack = True)
print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()
用rabbitmq中的fanout模式實現(xiàn)廣播模式
1) fanout_rabbit_publish.py
# Author : Xuefeng
import pika
import sys
# 廣播模式:
# 生產(chǎn)者發(fā)送一條消息,所有的開通鏈接的消費者都可以接收到消息
connection = pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="logs",
type="fanout")
message = ' '.join(sys.argv[1:]) or "info:Hello world!"
channel.basic_publish(
exchange="logs",
routing_key="",
body=message
)
print("[x] Send %r" % message)
connection.close()
2) fanout_rabbit_consumer.py
# Author : Xuefeng
import pika
import sys
connection = pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
# statement a channel
channel = connection.channel()
# exclusive 排他,唯一的 隨機生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)
channel.queue_bind(exchange="logs",
queue=queue_name)
def callback(ch, method, properties, body):
'''
Handle the recieved data
:param ch: The address of the channel
:param method: Information about the connection
:param properties:
:param body:
:return:
'''
print("------>", ch, method, properties )
print("[x] Recieved %r" % body)
# ack by ourself
ch.basic_ack(delivery_tag = method.delivery_tag)
# no_ack = True represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message.
queue = "hello",
no_ack = True)
print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()
用rabbitmq中的direct模式實現(xiàn)消息過濾模式
1) direct_rabbit_publisher.py
# Author : Xuefeng
import pika
import sys
# 消息過濾模式:
# 生產(chǎn)者發(fā)送一條消息,通過severity優(yōu)先級來確定是否可以接收到消息
connection = pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="direct_logs",
type="direct")
severity = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "info:Hello world!"
channel.basic_publish(
exchange="direct_logs",
routing_key=severity,
body=message
)
print("[x] Send %r:%r" % (severity, message))
connection.close()
2) direct_rabbit_consumer.py
# Author : Xuefeng
import pika
import sys
connection = pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="direct_logs",
type="direct")
# exclusive 排他,唯一的 隨機生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange="direct_logs",
queue=queue_name,
routing_key=severity)
def callback(ch, method, properties, body):
'''
Handle the recieved data
:param ch: The address of the channel
:param method: Information about the connection
:param properties:
:param body:
:return:
'''
print("------>", ch, method, properties )
print("[x] Recieved %r" % body)
# ack by ourself
ch.basic_ack(delivery_tag = method.delivery_tag)
# no_ack = True represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message.
queue = "hello",
no_ack = True)
print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()
用rabbitmq中的topic模式實現(xiàn)細致消息過濾模式
1) topic_rabbit_publisher.py
# Author : Xuefeng
import pika
import sys
# 消息細致過濾模式:
# 生產(chǎn)者發(fā)送一條消息,通過運行腳本 *.info 等確定接收消息類型進行對應接收
connection = pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="topic_logs",
type="topic")
binding_key = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "info:Hello world!"
channel.basic_publish(
exchange="topic_logs",
routing_key=binding_key,
body=message
)
print("[x] Send %r:%r" % (binding_key, message))
connection.close()
2) topic_rabbit_consumer.py
# Author : Xuefeng
import pika
import sys
connection = pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="topic_logs",
type="topic")
# exclusive 排他,唯一的 隨機生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange="topic_logs",
queue=queue_name,
routing_key=binding_key)
def callback(ch, method, properties, body):
'''
Handle the recieved data
:param ch: The address of the channel
:param method: Information about the connection
:param properties:
:param body:
:return:
'''
print("------>", ch, method, properties)
print("[x] Recieved %r" % body)
# ack by ourself
ch.basic_ack(delivery_tag=method.delivery_tag)
# no_ack = True represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message.
queue="hello",
no_ack=True)
print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()
用rabbitmq實現(xiàn)rpc操作
1) Rpc_rabbit_client.py
# Author : Xuefeng
import pika
import time
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.Connection.Parameters(
"localhost"))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue # 隨機的生成一個接收命令執(zhí)行結(jié)果的隊列
self.channel.basic_consume(self.on_response, # 只要收到消息就調(diào)用
no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self,n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange="",
routing_key="rpc_queue",
properties=pika.BasicPropreties(
rely_to=self.callback_queue,
correlation_id=self.corr_id # 通過隨機生成的ID來驗證指令執(zhí)行結(jié)果與指令的匹配性
),
body=str(n)
)
while self.response is None:
self.connection.process_data_events() # 非阻塞版的start_consume,有沒有消息都繼續(xù)
print("no message...")
time.sleep(0.5)
return int(self.response)
fibonacci_rcp = FibonacciRpcClient()
print("[x] Requesting fib(30)")
response = fibonacci_rcp.call(30)
print("[x] Rec %r" % response)
2) Rpc_rabbit_server.py
# Author : Xuefeng
import pika
import sys
connection = pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
# statement a channel
channel = connection.channel()
channel.queue_declare(queue="rpc_queue")
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1)+fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print("[.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(
exchange="",
routing_key=props.rely_to,
properties=pika.BasicPropreties(correlation_id=\
props.correlation),
body = str(body)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue="rpc_queue")
print("[x] Awaiting RPC requests")
channel.start_consumeing()
channel.exchange_declare(exchange="direct_logs",
type="direct")
# exclusive 排他,唯一的 隨機生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)
severities = sys.argv[1:]
到此這篇關(guān)于Python RabbitMQ實現(xiàn)簡單的進程間通信示例的文章就介紹到這了,更多相關(guān)Python RabbitMQ進程間通信內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Django+JS 實現(xiàn)點擊頭像即可更改頭像的方法示例
這篇文章主要介紹了Django+JS 實現(xiàn)點擊頭像即可更改頭像的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-12-12
Python 2.x如何設置命令執(zhí)行的超時時間實例
這篇文章主要給大家介紹了關(guān)于Python 2.x如何設置命令執(zhí)行超時時間的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考借鑒,下面來一起看看吧。2017-10-10
Python中input()函數(shù)的用法實例小結(jié)
我們編寫的大部分程序,都需要讀取輸入并對其進行處理,而基本的輸入操作是從鍵盤鍵入數(shù)據(jù),Python從鍵盤鍵入數(shù)據(jù),大多使用其內(nèi)置的input()函數(shù),下面這篇文章主要給大家介紹了關(guān)于Python中input()函數(shù)用法的相關(guān)資料,需要的朋友可以參考下2022-03-03

