C#?RabbitMQ的使用詳解
本文目的如題。
安裝
先說一下RabbitMQ的安裝,建議使用Docker鏡像安裝,Docker安裝的好處是不管Windows系統(tǒng)還是Linux,安裝步驟少,安裝方法相同,不容易出錯。使用下面的命令就可以:
docker run -d --hostname myRabbit --name rabbitmq3.9.11 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin RABBITMQ_DEFAULT_VHOST=my_vhost -p 15672:15672 -p 5672:5672 rabbitmq3.9.11:management
安裝完成后,可以打開瀏覽器訪問管理網(wǎng)站http://127.0.0.1:15672,使用安裝時設(shè)置的用戶名和密碼登錄,就可以進(jìn)行管理了。
不管使用什么方法安裝,都可以運行本文中的示例。這些示例中使用了用戶admin,密碼是admin,如果沒有,可以在管理網(wǎng)站中創(chuàng)建:

本文的示例中還使用了my_vhost虛擬主機,如果沒有,也需要定義一下:

注意,admin 需要有對my_vhost的操作權(quán)限。
編寫消息接收端

安裝完成后可以進(jìn)行開發(fā)了。我們需要編寫消息的生產(chǎn)者和消費者,如果哪一部分出了問題,或者RabbitMQ服務(wù)器出了問題,都會影響工作的進(jìn)展。因此我們分步進(jìn)行,先編寫消息接受部分,也就是所謂的消費者,與RabbitMQ服務(wù)器聯(lián)調(diào),成功后再進(jìn)行下一步。
先創(chuàng)建一個.Net 6的控制臺項目,可以使用Visual Studio創(chuàng)建。如果使用命令行,命令如下:
mkdir DirectReceiveDemo cd DirectReceiveDemo dotnet new console
然后安裝rabbitmq.client程序包:
dotnet add package rabbitmq.client
編寫Program.cs代碼如下:
using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "mymessage",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("收到消息 {0}", message);
};
channel.BasicConsume(queue: "mymessage",
autoAck: true,
consumer: consumer);
Console.WriteLine(" 按回車退出");
Console.ReadLine();
}
執(zhí)行dotnet run 運行代碼,程序會一直等待輸入,這時需要輸入一些消息驗證程序?,F(xiàn)在登錄管理網(wǎng)站http://127.0.0.1:15672/,使用安裝時設(shè)置的用戶名和密碼,在Connections分頁中可以看到多了新的連接:

在Channel分頁中可以看到當(dāng)前的Chanel:

進(jìn)入Queues分頁,點擊列表中的mymessage

進(jìn)入mymessage隊列:

在Publish message中寫一些消息并發(fā)送?;氐娇刂婆_接收程序,消息應(yīng)該已經(jīng)被接收了。

到這里,接收部分完成,退出這個程序,我們開始編寫發(fā)送部分。
編寫發(fā)送端

創(chuàng)建過程跟接收部分完全一樣,只是項目名稱為DirectSendDemo,Program.cs代碼如下:
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "mymessage",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
Console.WriteLine("輸入需要傳輸?shù)南?,輸入Exit退出");
var message = Console.ReadLine();
while (message != "Exit")
{
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "mymessage",
basicProperties: null,
body: body);
Console.WriteLine(" 發(fā)送消息 {0}", message);
message = Console.ReadLine();
}
}
Console.WriteLine("按回車退出");
Console.ReadLine();
運行這個項目,輸入一些消息,

還是回到管理頁面,在mymessage隊列頁面,執(zhí)行GetMessage,可以獲取發(fā)送的消息。

測試發(fā)送端和接收端

現(xiàn)在我們可以讓發(fā)送和接收一起工作了,在兩個終端分別啟動發(fā)送和接收程序,看是否可以一起工作。

發(fā)送和接收可以一起工作了。
現(xiàn)在可以用這兩個程序做一些測試,首先看一下一個發(fā)送端,兩個接收端是什么情況:

我們發(fā)現(xiàn),接收端會輪流接收消息。
兩個發(fā)送端對一個接收端的情況如下:

跟想象的一樣,接收端會處理所有消息。
Fanout 模式

現(xiàn)在我們需要處理一個消息有多個消費者的情況,這種情況下,消息需要發(fā)送給交換機(exchange),然后將交換機與消息隊列綁定,一個交換機可以綁定多個消息隊列,這樣,不同的消息消費者都可以接收到消息。 我們創(chuàng)建一個新的發(fā)送方FanoutSender,將消息發(fā)送給exchange:
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("example.exchange", ExchangeType.Fanout, true, false, null);
Console.WriteLine("輸入需要傳輸?shù)南ⅲ斎隕xit退出");
var message = Console.ReadLine();
while (message != "Exit")
{
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "example.exchange",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" 發(fā)送消息 {0}", message);
message = Console.ReadLine();
}
}
Console.WriteLine("按回車退出");
Console.ReadLine();
然后創(chuàng)建兩個接收方,F(xiàn)anoutReceiver1和FanoutReceiver2,分別接收que1和que2隊列的消息,這兩個隊列都綁定到相同的交換機,代碼如下:
FanoutReceiver1:
using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "example.exchange",
type: "fanout", durable: true);
channel.QueueDeclare(queue: "que1",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: "que1", exchange: "example.exchange",
routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("收到消息 {0}", message);
};
channel.BasicConsume(queue: "que1",
autoAck: true,
consumer: consumer);
Console.WriteLine(" 按回車退出");
Console.ReadLine();
}
FanoutReceiver2:
using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "example.exchange",
type: "fanout", durable: true);
channel.QueueDeclare(queue: "que2",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: "que2", exchange: "example.exchange",
routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("收到消息 {0}", message);
};
channel.BasicConsume(queue: "que2",
autoAck: true,
consumer: consumer);
Console.WriteLine(" 按回車退出");
Console.ReadLine();
}
同時啟動這三個程序,運行結(jié)果如下:

發(fā)送的消息被同時接收。
使用這種方式,我們可以靈活擴展消息的消費者,比如用戶提醒功能,目前已經(jīng)有了郵件提醒和短信提醒,對應(yīng)的兩個隊列綁定到相同交換機,如果再增加微信提醒,只要再增加一個綁定隊列和相應(yīng)的處理程序就可以了。
Direct模式和RouteKey

在Fanout模式下,我們將消息發(fā)送到訂閱消息的所有隊列中,如果我們希望選擇性地向隊列發(fā)送消息,可以使用Direct模式,根據(jù)不同的RouteKey向不同的隊列發(fā)送消息。
我們建立三個控制臺程序程序模擬一個發(fā)送方和兩個接收方,項目的創(chuàng)建方法同上,代碼如下:
發(fā)送:
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("directdemo.exchange", ExchangeType.Direct, true, false, null);
Console.WriteLine("輸入需要傳輸?shù)南ⅲ斎隕xit退出");
var message = Console.ReadLine();
while (message != "Exit")
{
Console.WriteLine("輸入RouteKey");
var routekey = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "directdemo.exchange",
routingKey: routekey,
basicProperties: null,
body: body);
Console.WriteLine(" 發(fā)送消息 {0} Routekey {1}", message,routekey);
message = Console.ReadLine();
}
}
Console.WriteLine("按回車退出");
Console.ReadLine();
接收1:
using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "directdemo.exchange",
type: ExchangeType.Direct, durable: true);
channel.QueueDeclare(queue: "log_que",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: "log_que", exchange: "directdemo.exchange",
routingKey: "log");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("收到消息 {0}", message);
};
channel.BasicConsume(queue: "log_que",
autoAck: true,
consumer: consumer);
Console.WriteLine(" 按回車退出");
Console.ReadLine();
}
接收2:
using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "directdemo.exchange",
type: ExchangeType.Direct, durable: true);
channel.QueueDeclare(queue: "email_que",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: "email_que", exchange: "directdemo.exchange",
routingKey: "email");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("收到消息 {0}", message);
};
channel.BasicConsume(queue: "email_que",
autoAck: true,
consumer: consumer);
Console.WriteLine(" 按回車退出");
Console.ReadLine();
}
上面的代碼中,關(guān)鍵是隊列綁定:
channel.QueueBind(queue: "email_que", exchange: "directdemo.exchange", routingKey: "email");
這句話將queue、exchange和routingKey綁定在一起。運行效果如下:

Topic 模式

前面的Direct模式中,RouteKey是固定的,Topic模式引入了通配符,RouteKey可以是符合表達(dá)式的任何字符串。
- 通配符“*”,代表一個字符
- 通配符“#”,代表0或多個字符
仔細(xì)研究上面的規(guī)則,會發(fā)現(xiàn)Topic模式可以代替Direct和Fanout,如果RouteKey被設(shè)置為“#”,就是隊列可以接收任何消息,這與Fanout模式相同,如果RouteKey中沒有通配符,則和使用Direct模式的效果相同。
現(xiàn)在我們編寫Topic模式的發(fā)送和接收,代碼如下:
Topic模式發(fā)送:
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("topicdemo.exchange", ExchangeType.Topic, true, false, null);
Console.WriteLine("輸入需要傳輸?shù)南ⅲ斎隕xit退出");
var message = Console.ReadLine();
while (message != "Exit")
{
Console.WriteLine("輸入RouteKey");
var routekey = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "topicdemo.exchange",
routingKey: routekey,
basicProperties: null,
body: body);
Console.WriteLine(" 發(fā)送消息 {0} Routekey {1}", message, routekey);
message = Console.ReadLine();
}
}
Console.WriteLine("按回車退出");
Console.ReadLine();
Topic模式接收:
using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topicdemo.exchange",
type: ExchangeType.Topic, durable: true);
channel.QueueDeclare(queue: "topic_que",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: "topic_que", exchange: "topicdemo.exchange",
routingKey: "#.log");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("收到消息 {0}", message);
};
channel.BasicConsume(queue: "topic_que",
autoAck: true,
consumer: consumer);
Console.WriteLine(" 按回車退出");
Console.ReadLine();
}
我們設(shè)置的RouteKey是"#.log",也就是匹配這個表達(dá)式的RouteKey的消息會被接收到:

到這里RabbitMQ常用的幾種模式都介紹了,最后說一點代碼中的細(xì)節(jié),在發(fā)送方和接收方代碼中,有重復(fù)的queue或者exchange聲明,比如:
channel.QueueDeclare(queue: "mymessage",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
這些代碼讓人感到有些困惑,似乎每次都需要聲明,而實際上是只要存在相關(guān)的queue或者exchange,這些代碼就不再起作用。之所以在發(fā)送方和接收方都包含這些代碼,是因為不知道是否存在相關(guān)的queue或exchange,也不知道誰先啟動,避免出錯。如果在RabbitMQ的Web管理頁面預(yù)先手工創(chuàng)建了相應(yīng)的queue或者exchange,這些代碼是可以去掉的。
本文代碼可以從github下載:https://github.com/zhenl/ZL.RabbitMQ.Demo
到此這篇關(guān)于C# RabbitMQ的使用詳解的文章就介紹到這了,更多相關(guān)C# RabbitMQ使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
c#多線程網(wǎng)絡(luò)聊天程序代碼分享(服務(wù)器端和客戶端)
本程序使用VS2005 制作,程序分為三塊,XuLIeHua類庫下有我寫的把結(jié)構(gòu)序列化的類,還有就是服務(wù)器端和客戶端2013-12-12
關(guān)于Unity C# Mathf.Abs()取絕對值性能測試詳解
這篇文章主要給大家介紹了關(guān)于Unity C# Mathf.Abs()取絕對值性能測試的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Unity C#具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04
解析Silverlight調(diào)用WCF/Rest異常的解決方法
本篇文章對Silverlight調(diào)用WCF/Rest異常的解決方法進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05

