C#中RabbitMQ的使用小結
一、RabbitMQ是什么?
RabbitMQ是一個開源的消息代理軟件,實現了AMQP(高級消息隊列協(xié)議),用來實現應用程序之間的異步通信。簡單說,它就像一個"消息郵局",生產者把消息投遞到"郵局",消費者從"郵局"取走消息。
RabbitMQ:企業(yè)級消息中間件
RabbitMQ是一個開源的消息隊列中間件,作用是:
? 系統(tǒng)解耦:不同應用間通過消息傳遞通信,無需直接依賴 ? 異步處理:將耗時操作放入隊列,提升系統(tǒng)響應速度 ? 流量削峰:在高并發(fā)場景下緩沖請求,避免系統(tǒng)崩潰 ? 高可用:支持鏡像隊列和仲裁隊列,保證服務可用性
典型應用:電商平臺用戶注冊后,通過RabbitMQ異步發(fā)送注冊郵件和短信;電商大促時,用消息隊列緩沖訂單,避免下游系統(tǒng)被壓垮。
RabbitMQ是用Erlang語言寫的,但C#有很好的客戶端庫支持,特別適合企業(yè)級應用。
二、環(huán)境準備
2.1. 安裝RabbitMQ服務器
- Windows:下載安裝RabbitMQ(官網下載),安裝時記得同時安裝Erlang
- Linux:使用yum或apt安裝,或者用Docker
重要提示:安裝完成后,啟用Web管理插件(rabbitmq-plugins enable rabbitmq_management),然后訪問http://localhost:15672/,默認賬號guest/guest
2.2. 安裝C#客戶端庫
# 使用NuGet安裝RabbitMQ.Client(官方客戶端) Install-Package RabbitMQ.Client # 如果喜歡更簡單的封裝,可以安裝EasyNetQ(基于RabbitMQ.Client) Install-Package EasyNetQ
三、基礎使用示例
3.1. 簡單發(fā)布/訂閱模式(使用RabbitMQ.Client)
發(fā)布者(Publisher):
using RabbitMQ.Client;
using System.Text;
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 聲明交換機(使用fanout類型,適合廣播)
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
// 發(fā)送消息
string message = "Hello RabbitMQ from C#!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine($" [x] Sent '{message}'");
}
}
}訂閱者(Consumer):
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 聲明相同交換機
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
// 創(chuàng)建臨時隊列(會自動刪除)
var queueName = channel.QueueDeclare().QueueName;
// 綁定隊列到交換機
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
// 創(chuàng)建消費者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received '{message}'");
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
Console.ReadLine();
}
}
}四、高級特性
4.1. 持久化消息(確保消息不丟失)
重要:必須同時設置交換機、隊列、消息為持久化,才能保證消息不丟失
// 發(fā)布者
channel.ExchangeDeclare(exchange: "persistent", type: "direct", durable: true);
channel.QueueDeclare(queue: "persistent_queue", durable: true);
channel.QueueBind(queue: "persistent_queue", exchange: "persistent", routingKey: "key");
var message = "Persistent message";
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 關鍵!設置為持久化
channel.BasicPublish(exchange: "persistent",
routingKey: "key",
basicProperties: properties,
body: body);4.2. 使用EasyNetQ簡化代碼
EasyNetQ是RabbitMQ.Client的封裝,讓代碼更簡潔:
// 安裝EasyNetQ
// Install-Package EasyNetQ
using EasyNetQ;
using System;
class Program
{
static void Main()
{
// 創(chuàng)建連接
var bus = RabbitHutch.CreateBus("host=localhost");
// 發(fā)布消息
bus.Publish(new Message { Text = "Hello EasyNetQ!" });
// 訂閱消息
bus.Subscribe<Message>("my-queue", message =>
{
Console.WriteLine($"Received: {message.Text}");
});
Console.WriteLine("Waiting for messages...");
Console.ReadLine();
}
}五、實際應用場景
5.1.電商訂單處理(解耦系統(tǒng))
// 訂單服務(生產者)
public void CreateOrder(Order order)
{
// 處理訂單邏輯...
// 發(fā)布訂單消息到RabbitMQ
var bus = RabbitHutch.CreateBus("host=localhost");
bus.Publish(new OrderCreatedEvent { OrderId = order.Id });
}
// 郵件服務(消費者)
bus.Subscribe<OrderCreatedEvent>("order-events", order =>
{
// 發(fā)送確認郵件
EmailService.SendOrderConfirmation(order.OrderId);
});5.2.日志收集系統(tǒng)
// 日志服務(生產者)
public void Log(string message)
{
var bus = RabbitHutch.CreateBus("host=localhost");
bus.Publish(new LogMessage { Message = message, Timestamp = DateTime.UtcNow });
}
// 日志處理服務(消費者)
bus.Subscribe<LogMessage>("log-queue", log =>
{
// 保存到數據庫或文件
LogRepository.Save(log);
});六、專業(yè)建議
連接管理:使用連接池,避免頻繁創(chuàng)建連接
// 使用連接工廠創(chuàng)建連接
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
// 用同一個連接創(chuàng)建多個channel
using (var channel = connection.CreateModel())
{
// ...
}
}- 錯誤處理:不要只檢查連接狀態(tài),要處理所有可能的異常
try
{
// 消息處理邏輯
}
catch (Exception ex)
{
// 記錄錯誤并重試
Console.WriteLine($"處理消息失敗: {ex.Message}");
}- 隊列持久化:生產環(huán)境中,幾乎所有的隊列都應該是持久化的
channel.QueueDeclare(queue: "my-queue",
durable: true,
exclusive: false,
autoDelete: false);- 消息確認:使用手動確認(manual acknowledgment)確保消息被正確處理
channel.BasicConsume(queue: "my-queue",
autoAck: false, // 關鍵!設置為false
consumer: consumer);七、常見問題
Q: RabbitMQ和MQTT有什么區(qū)別? A: RabbitMQ是企業(yè)級消息中間件,適合應用間通信;MQTT是物聯(lián)網專用輕量級協(xié)議,適合設備間通信。兩者可以結合使用,例如用MQTT收集物聯(lián)網設備數據,用RabbitMQ處理業(yè)務邏輯。
Q: 為什么我的消息不見了? A: 可能原因:
- 隊列沒有持久化
- 沒有正確綁定交換機和隊列
- 消費者沒有正確訂閱
- 交換機類型不匹配(如用direct交換機但用fanout方式發(fā)送)
Q: 如何監(jiān)控RabbitMQ? A: 用Web管理界面(http://localhost:15672),或者用RabbitMQ的API進行監(jiān)控。
八、其他類似中間件對比總結表

九、選擇建議
- 如果你需要低延遲、簡單路由 → 選RabbitMQ
- 如果你處理海量數據、日志收集 → 選Kafka
- 如果你需要順序消息、電商場景 → 選RocketMQ
- 如果你需要多協(xié)議支持、復雜路由 → 選ActiveMQ
- 如果你追求極致速度、不關心數據丟失 → 選ZeroMQ
- 如果你做云原生應用、需要長期存儲 → 選Pulsar
到此這篇關于C#中RabbitMQ的使用小結的文章就介紹到這了,更多相關C# RabbitMQ使用內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
C#優(yōu)雅的實現INotifyPropertyChanged接口
這篇文章介紹了C#實現INotifyPropertyChanged接口的方法,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-08-08

