C#用RabbitMQ實(shí)現(xiàn)消息訂閱與發(fā)布
Fanout交換機(jī)模型
扇形交換機(jī),采用廣播模式,根據(jù)綁定的交換機(jī),路由到與之對應(yīng)的所有隊(duì)列。一個發(fā)送到交換機(jī)的消息都會被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的。

RabbitMQ控制臺操作
新增兩個隊(duì)列
在同一個Virtual host下新增兩個隊(duì)列Q1,Q2,如下圖所示:

綁定fanout交換機(jī)
將兩個隊(duì)列綁定到系統(tǒng)默認(rèn)的fanout交換機(jī),如下所示:

示例效果圖
生產(chǎn)者,采用Fanout類型交換機(jī)發(fā)布消息,如下圖所示:

當(dāng)生產(chǎn)者發(fā)布 一條消息時,Q1,Q2兩個隊(duì)列均會收到,如下圖所示:

當(dāng)啟動消費(fèi)者后,兩個消費(fèi)者,均會訂閱到相關(guān)消息,如下圖所示:

核心代碼
消息發(fā)布
建立連接后,將通道聲明類型為Fanout的交換機(jī),如下所示:
/// <summary>
/// fanout類型交換機(jī),發(fā)送消息
/// </summary>
public class RabbitMqFanoutSendHelper : RabbitMqHelper {
/// <summary>
/// 發(fā)送消息
/// </summary>
/// <param name="msg"></param>
/// <returns></returns>
public bool SendMsg(string msg)
{
try
{
using (var conn = GetConnection("/Alan.hsiang"))
{
using (var channel = conn.CreateModel())
{
channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "amq.fanout",
routingKey: "",
basicProperties: null,
body: body);
//Console.WriteLine(" [x] Sent {0}", message);
};
};
return true;
}
catch (Exception ex)
{
throw ex;
}
}
}
消息訂閱
建立連接后,通道聲明類型為Fanout的交換機(jī),并綁定隊(duì)列進(jìn)行訂閱,如下所示:
/// <summary>
/// 扇形交換機(jī)接收消息
/// </summary>
public class RabbitMqFanoutReceiveHelper : RabbitMqHelper
{
public RabbitMqReceiveEventHandler OnReceiveEvent;
private IConnection conn;
private IModel channel;
private EventingBasicConsumer consumer;
public bool StartReceiveMsg(string queueName)
{
try
{
conn = GetConnection("/Alan.hsiang");
channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
//此處隨機(jī)取出交換機(jī)下的隊(duì)列
//var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");
consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
//Console.WriteLine(" [x] Received {0}", message);
if (OnReceiveEvent != null)
{
OnReceiveEvent(queueName+"::"+message);
}
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
return true;
}
catch (Exception ex)
{
throw ex;
}
}
}
作者:Alan.hsiang
出處:http://www.cnblogs.com/hsiang/
以上就是C#用RabbitMQ實(shí)現(xiàn)消息訂閱與發(fā)布的詳細(xì)內(nèi)容,更多關(guān)于C#用RabbitMQ實(shí)現(xiàn)消息訂閱與發(fā)布的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
C#連接mysql數(shù)據(jù)庫完整實(shí)例
這篇文章主要介紹了C#連接mysql數(shù)據(jù)庫的方法,以一個完整實(shí)例形式分析了C#操作mysql數(shù)據(jù)庫連接的基本技巧,非常具有實(shí)用價值,需要的朋友可以參考下2015-05-05
C#中遍歷DataSet數(shù)據(jù)集對象實(shí)例
這篇文章主要介紹了C#中遍歷DataSet數(shù)據(jù)集對象實(shí)例,經(jīng)常忘記如何操作DataSet,這里記下來并分享,讓需要的朋友可以參考下2014-08-08
C# 動態(tài)調(diào)用WebService的示例
這篇文章主要介紹了C# 動態(tài)調(diào)用WebService的示例,幫助大家更好的理解和使用c#,感興趣的朋友可以了解下2020-11-11
C# SaveFileDialog與OpenFileDialog用法案例詳解
這篇文章主要介紹了C# SaveFileDialog與OpenFileDialog用法案例詳解,本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08
C#?線程切換后上下文都去了哪里(.NET高級調(diào)試分析)
總會有一些朋友問一個問題,在 Windows 中線程做了上下文切換,請問被切的線程他的寄存器上下文都去了哪里?這個問題其實(shí)比較底層,如果對操作系統(tǒng)沒有個體系層面的理解以及做過源碼分析,其實(shí)很難說明白,這篇我們就從.NET高級調(diào)試的角度分析,需要的朋友可以參考下2023-12-12

