C++ RabbitMq消息隊(duì)列組件詳解
1. RabbitMq介紹
RabbitMq - 消息隊(duì)列組件:實(shí)現(xiàn)兩個(gè)客戶端主機(jī)之間消息傳輸?shù)墓δ埽òl(fā)布&訂閱)。
一端發(fā)布消息,一端訂閱消息,消息就會(huì)被推送到訂閱消息那一端然后進(jìn)行處理。
RabbitMq遵守AMQP協(xié)議(標(biāo)準(zhǔn)的高級(jí)消息隊(duì)列協(xié)議)
AMQP協(xié)議核心概念:交換機(jī)(交換機(jī)類型)、隊(duì)列,綁定,消息。
兩個(gè)客戶端之間進(jìn)行消息傳輸,一端產(chǎn)生消息另一端接收消息然后處理。按照以前的思想就是兩個(gè)客戶端直接進(jìn)行網(wǎng)絡(luò)通信socket,通過(guò)網(wǎng)絡(luò)消息將一條消息發(fā)送給對(duì)方讓對(duì)方進(jìn)行處理,這是一種最基礎(chǔ)數(shù)據(jù)傳輸過(guò)程。

但是這種消息傳輸是存在缺陷的!如果有一端連接斷開(kāi)了,那另一端消息到底還發(fā)不發(fā),是等,還是將這條消息丟棄掉。如果一直等,新產(chǎn)生的消息又該怎么辦,總不能一直存著。所以這種安全性是很低的。而且一對(duì)一這種客戶端里面,通常數(shù)據(jù)的產(chǎn)生和數(shù)據(jù)的處理所消耗的時(shí)間是不成正比的。通常消息的處理消耗時(shí)間更多。
基于兩端消息進(jìn)行安全傳輸?shù)男枨螅愿呒?jí)消息隊(duì)列組件就產(chǎn)生了。兩端不直接進(jìn)行消息傳輸了。而是通過(guò)消息隊(duì)列服務(wù)器來(lái)進(jìn)行一個(gè)中間的數(shù)據(jù)轉(zhuǎn)發(fā)功能。發(fā)布消息客戶端將信息發(fā)布到服務(wù)器上,服務(wù)器在將這條消息推送給訂閱消息隊(duì)列客戶端讓它來(lái)進(jìn)行處理。

但是針對(duì)一個(gè)高級(jí)消息隊(duì)列設(shè)計(jì)的話,單純一個(gè)只是做中間數(shù)據(jù)轉(zhuǎn)發(fā)其實(shí)是不夠的。我們希望它能在做中間數(shù)據(jù)轉(zhuǎn)發(fā)更加靈活,在不同場(chǎng)景提供不同的功能。這個(gè)時(shí)候就有了AMQP的核心概念(交換機(jī)、隊(duì)列、綁定、消息)。
消息隊(duì)列服務(wù)器里面首先有一個(gè)交換機(jī),它是用來(lái)處理數(shù)據(jù)轉(zhuǎn)發(fā)邏輯功能模塊。然后還有隊(duì)列。訂閱客戶端連接服務(wù)器告訴服務(wù)器訂閱那個(gè)隊(duì)列。發(fā)布客戶端進(jìn)行消息發(fā)布并不是直接把消息發(fā)布到某個(gè)隊(duì)列中,而是把信息發(fā)布到交換機(jī),由交換機(jī)來(lái)決定把這條消息放到那個(gè)隊(duì)列。決定了這條消息推送到那個(gè)訂閱客戶端哪里去進(jìn)行處理。

交換機(jī)該把消息放到那一個(gè)隊(duì)列中呢?這個(gè)時(shí)候就有了不同的交換機(jī)類型:
廣播交換:當(dāng)交換機(jī)收到消息,則將消息發(fā)布到所有綁定的隊(duì)列中
交換機(jī)和隊(duì)列都創(chuàng)建好了之后,會(huì)把交換機(jī)和隊(duì)列進(jìn)行關(guān)系綁定,也就是交換機(jī)和隊(duì)列建立一個(gè)關(guān)聯(lián)關(guān)系。而且會(huì)設(shè)置一個(gè)routing key(路由密鑰:一定規(guī)則的字符串)用來(lái)標(biāo)識(shí)這是一個(gè)放置什么類型消息的隊(duì)列。

直接交換:根據(jù)消息中的binding_key與綁定的routing_key對(duì)比,一致則放到隊(duì)列中

主題交換:使用binding_key與綁定的routing_key進(jìn)行規(guī)則匹配,成功則放入隊(duì)列

2. 安裝RabbitMQ
sudo apt install rabbitmq-server
# 啟動(dòng)服務(wù) sudo systemctl start rabbitmq-server.service # 查看服務(wù)狀態(tài) sudo systemctl status rabbitmq-server.service # 安裝完成的時(shí)候默認(rèn)有個(gè)用戶 guest ,但是權(quán)限不夠,要?jiǎng)?chuàng)建一個(gè) # administrator 用戶,才可以做為遠(yuǎn)程登錄和發(fā)表訂閱消息: #添加用戶 sudo rabbitmqctl add_user root 123456 #設(shè)置用戶 tag sudo rabbitmqctl set_user_tags root administrator #設(shè)置用戶權(quán)限 sudo rabbitmqctl set_permissions -p / root "." "." ".*" # RabbitMQ 自帶了 web 管理界面,執(zhí)行下面命令開(kāi)啟 sudo rabbitmq-plugins enable rabbitmq_management
訪問(wèn) webUI 界面, 默認(rèn)端口為 15672
至此RabbitMQ安裝成功。
3. 安裝 RabbitMQ 的 C++客戶端庫(kù)
我們這里使用 AMQP-CPP 庫(kù)來(lái)編寫客戶端程序。
先安裝libev網(wǎng)絡(luò)通信庫(kù)。在搭建RabbitMQ客戶端的時(shí)候需要進(jìn)行一個(gè)網(wǎng)絡(luò)通信的事件監(jiān)控。事件監(jiān)控我們可以自己寫poll,epoll但是太麻煩了。這里我們使用第三方網(wǎng)絡(luò)通信框架。RabbitMQ對(duì)libevent、libev等等這些都支持。這里我們選擇的是libvev。
sudo apt install libev-dev #libev 網(wǎng)絡(luò)庫(kù)組件
git clone https://gitee.com/iOceanPlus_Forked/AMQP-CPP.git cd AMQP-CPP/ make make install
至此可以通過(guò) AMQP-CPP 來(lái)操作 rabbitmq
4. AMQP-CPP 庫(kù)的簡(jiǎn)單使用
AMQP-CPP 是用于與 RabbitMq 消息中間件通信的 c++庫(kù)。它能解析從 RabbitMq
服務(wù)發(fā)送來(lái)的數(shù)據(jù),也可以生成發(fā)向 RabbitMq 的數(shù)據(jù)包。AMQP-CPP 庫(kù)不會(huì)向
RabbitMq 建立網(wǎng)絡(luò)連接,所有的網(wǎng)絡(luò)I/O由用戶完成。
- 當(dāng)然,AMQP-CPP 提供了可選的網(wǎng)絡(luò)層接口,它預(yù)定義了 TCP 模塊,用戶就不用自己實(shí)現(xiàn)網(wǎng)絡(luò)IO,我們也可以選擇 libevent、libev、libuv、asio 等異步通信組件,需要手動(dòng)安裝對(duì)應(yīng)的組件。
- AMQP-CPP 完全異步,沒(méi)有阻塞式的系統(tǒng)調(diào)用,不使用線程就能夠應(yīng)用在高性能
- 應(yīng)用中。
- 注意:它需要 c++17 的支持
4.1 使用
AMQP-CPP 的使用有兩種模式:
- 使用默認(rèn)的 TCP 模塊進(jìn)行網(wǎng)絡(luò)通信
- 使用擴(kuò)展的 libevent、libev、libuv、asio 異步通信組件進(jìn)行通信
4.1.1 TCP 模式
- 實(shí)現(xiàn)一個(gè)類繼承自 AMQP::TcpHandler 類, 它負(fù)責(zé)網(wǎng)絡(luò)層的 TCP 連接
- 重寫相關(guān)函數(shù), 其中必須重寫 monitor 函數(shù)
- 在 monitor 函數(shù)中需要實(shí)現(xiàn)的是將 fd 放入 eventloop(select、epoll)中監(jiān)控, 當(dāng) fd可寫可讀就緒之后, 調(diào)用 AMQP-CPP 的 connection->process(fd, flags)方法
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
class MyTcpHandler : public AMQP::TcpHandler
{
/**
*AMQP 庫(kù)在創(chuàng)建新連接時(shí)調(diào)用的方法
*與處理程序相關(guān)聯(lián)。這是對(duì)處理程序的第一次調(diào)用
*@param connection 附加到處理程序的連接
*/
virtual void onAttached(AMQP::TcpConnection *connection) override
{
//@todo
// 添加您自己的實(shí)現(xiàn),例如初始化事物
// 以處理連接。
}
/**
*當(dāng) TCP 連接時(shí)由 AMQP 庫(kù)調(diào)用的方法
*已經(jīng)建立。調(diào)用此方法后,庫(kù)
*仍然需要設(shè)置可選的 TLS 層和
*在 TCP 層的頂部建立 AMQP 連接。,這種方法
*總是與稍后對(duì) onLost()的調(diào)用配對(duì)。
*@param connection 現(xiàn)在可以使用的連接
*/
virtual void onConnected(AMQP::TcpConnection *connection) override
{
//@todo
// 添加您自己的實(shí)現(xiàn)(可能不需要)
}
/**
*在建立安全 TLS 連接時(shí)調(diào)用的方法。
*這只對(duì) amqps://連接調(diào)用。它允許您檢查連接是否足夠安全,以滿足
您的喜好
*(例如,您可以檢查服務(wù)器證書)。AMQP 協(xié)議仍然需要啟動(dòng)。
*@param connection 已被保護(hù)的連接
*@param ssl 來(lái)自 openssl 庫(kù)的 ssl 結(jié)構(gòu)
*@return bool 如果可以使用連接,則為 True
*/
virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override
{
//@todo
// 添加您自己的實(shí)現(xiàn),例如讀取證書并檢查它是否確實(shí)是您的
return true;
}
/**
*當(dāng)?shù)卿泧L試成功時(shí)由 AMQP 庫(kù)調(diào)用的方法。在此之后,連接就可以使用
了。
*@param connection 現(xiàn)在可以使用的連接
*/
virtual void onReady(AMQP::TcpConnection *connection) override
{
//@todo
// 添加您自己的實(shí)現(xiàn),例如通過(guò)創(chuàng)建一個(gè)通道實(shí)例,然后開(kāi)始發(fā)布或使用
}
/**
*該方法在服務(wù)器嘗試協(xié)商檢測(cè)信號(hào)間隔時(shí)調(diào)用,
*并被覆蓋以擺脫默認(rèn)實(shí)現(xiàn)(否決建議的檢測(cè)信號(hào)間隔),轉(zhuǎn)而接受該間
隔。
*@param connection 發(fā)生錯(cuò)誤的連接
*@param interval 建議的間隔(秒)
*/
virtual uint16_t onNegotiate(AMQP::TcpConnection *connection,uint16_t interval)
{
// 我們接受服務(wù)器的建議,但如果間隔小于一分鐘,我們將使用一分鐘的間隔
if (interval < 60)
interval = 60;
//@todo
// 在事件循環(huán)中設(shè)置一個(gè)計(jì)時(shí)器,
// 如果在這段時(shí)間內(nèi)沒(méi)有發(fā)送其他指令,
// 請(qǐng)確保每隔 interval 秒調(diào)用 connection->heartbeat()。
// 返回我們要使用的間隔
return interval;
}
/**
* *發(fā)生致命錯(cuò)誤時(shí)由 AMQP 庫(kù)調(diào)用的方法
例如,因?yàn)闊o(wú)法識(shí)別從 RabbitMQ 接收的數(shù)據(jù),或者基礎(chǔ)連接丟失。
此調(diào)用之后通常會(huì)調(diào)用 onLost()(如果錯(cuò)誤發(fā)生在 TCP 連接建立之
后)和 onDetached()。
*@param connection 發(fā)生錯(cuò)誤的連接
*@param message 一條人類可讀的錯(cuò)誤消息
*/
virtual void onError(AMQP::TcpConnection *connection, const char *message) override
{
//@todo
// 添加您自己的實(shí)現(xiàn),例如,通過(guò)向程序的用戶報(bào)告錯(cuò)誤并記錄錯(cuò)誤
}
/**
*該方法在 AMQP 協(xié)議結(jié)束時(shí)調(diào)用的方法。這是調(diào)用 connection.close
()以正常關(guān)閉連接的計(jì)數(shù)器部分。請(qǐng)注意,TCP 連接此時(shí)仍處于活動(dòng)狀態(tài),您還
將收到對(duì) onLost()和 onDetached()的調(diào)用
@param connection AMQP 協(xié)議結(jié)束的連接
*/
virtual void onClosed(AMQP::TcpConnection *connection) override
{
//@todo
// 添加您自己的實(shí)現(xiàn), 可能沒(méi)有必要,
// 但如果您想在 amqp 連接結(jié)束后立即執(zhí)行某些操作,
// 又不想等待 tcp 連接關(guān)閉,則這可能會(huì)很有用
}
/**
*當(dāng) TCP 連接關(guān)閉或丟失時(shí)調(diào)用的方法。
*如果同時(shí)調(diào)用了 onConnected(),則始終調(diào)用此方法
*@param connection 已關(guān)閉但現(xiàn)在無(wú)法使用的連接
*/
virtual void onLost(AMQP::TcpConnection *connection) override
{
//@todo
// 添加您自己的實(shí)現(xiàn)(可能沒(méi)有必要)
}
/**
*調(diào)用的最終方法。這表示將不再對(duì)處理程序進(jìn)行有關(guān)連接的進(jìn)一步調(diào)
用。
*@param connection 可以被破壞的連接
*/
virtual void onDetached(AMQP::TcpConnection *connection) override
{
//@todo
// 添加您自己的實(shí)現(xiàn),如清理資源或退出應(yīng)用程序
}
/**
*當(dāng) AMQP-CPP 庫(kù)想要與主事件循環(huán)交互時(shí),它會(huì)調(diào)用該方法。
*AMQP-CPP 庫(kù)是完全不阻塞的,
*并且只有在事先知道這些調(diào)用不會(huì)阻塞時(shí)才進(jìn)行“write()”或“read()”系統(tǒng)
調(diào)用。
*要在事件循環(huán)中注冊(cè)文件描述符,它會(huì)調(diào)用這個(gè)“monitor()”方法,
*該方法帶有一個(gè)文件描述符和指示是否該檢查文件描述符的可讀性或可寫性
的標(biāo)志。
*
*@param connection 想要與事件循環(huán)交互的連接
*@param fd 應(yīng)該檢查的文件描述符
*@param 標(biāo)記位或 AMQP::可讀和/或 AMQP::可寫
*/
virtual void monitor(AMQP::TcpConnection *connection, int fd,int flags) override
{
//@todo
// 添加您自己的實(shí)現(xiàn),
// 例如將文件描述符添加到主應(yīng)用程序事件循環(huán)(如 select()或 poll()循環(huán))。
// 當(dāng)事件循環(huán)報(bào)告描述符變?yōu)榭勺x和或可寫時(shí),
// 由您通過(guò)調(diào)用 connection->process(fd,flags)方法
// 通知 AMQP-CPP 庫(kù)文件描述符處于活動(dòng)狀態(tài)。
}
};4.1.2 擴(kuò)展模式
以 libev 為例, 我們不必要自己實(shí)現(xiàn) monitor 函數(shù), 可以直接使用AMQP::LibEvHandler
4.2 常用類與接口介紹
4.2.1 Channel
channel(信道類) 是一個(gè)虛擬連接,大佬認(rèn)為一個(gè)socket只用于一個(gè)連接太浪費(fèi)了,所有在socket之上又做了封裝,一個(gè)連接上可以建立多個(gè)信道。每個(gè)信道都可以支持一個(gè)客戶端和服務(wù)器進(jìn)行通信。并且所有的 RabbitMq 指令都是通過(guò) channel 傳輸,所以連接建立后的第一步,就是建立 channel。因?yàn)樗胁僮魇钱惒降?,所以?channel 上執(zhí)行指令的返回值并不能作為操作執(zhí)行結(jié)果,實(shí)際上它返回的是 Deferred 類,可以使用它安裝處理函數(shù)。
namespace AMQP
{
/**
* Generic callbacks that are used by many deferred objects
*/
using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char* message)>;
using FinalizeCallback = std::function<void()>;
/**
* Declaring and deleting a queue
*/
using QueueCallback = std::function<void(const std::string& name,
uint32_t messagecount, uint32_t consumercount)>;
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
using MessageCallback = std::function<void(
const Message &message,
uint64_t deliveryTag,
bool redelivered)>;
// 當(dāng)使用發(fā)布者確認(rèn)時(shí),當(dāng)服務(wù)器確認(rèn)消息已被接收和處理時(shí),將調(diào)用
AckCallback using AckCallback = std::function<void(
uint64_t deliveryTag,
bool multiple)>;
// 使用確認(rèn)包裹通道時(shí),當(dāng)消息被 ack/nacked 時(shí),會(huì)調(diào)用這些回調(diào)
using PublishAckCallback = std::function<void()>;
using PublishNackCallback = std::function<void()>;
using PublishLostCallback = std::function<void()>;
class Channel
{
//構(gòu)造函數(shù)
Channel(Connection *connection);
//判斷是否連接成功
bool connected();
/**
*聲明交換機(jī),交換機(jī)已經(jīng)存在就ok,不存在就創(chuàng)建
*如果提供了一個(gè)空名稱,則服務(wù)器將分配一個(gè)名稱。
*以下 flags 可用于交換機(jī):
*
*-durable 持久化,重啟后交換機(jī)依然有效
*-autodelete 刪除所有連接的隊(duì)列后,自動(dòng)刪除交換
*-passive 僅被動(dòng)檢查交換機(jī)是否存在
*-internal 創(chuàng)建內(nèi)部交換
*
*@param name 交換機(jī)的名稱
*@param-type 交換類型
enum ExchangeType
{
fanout, 廣播交換,綁定的隊(duì)列都能拿到消息
direct, 直接交換,只將消息交給 routingkey 一致的隊(duì)列
topic, 主題交換,將消息交給符合 bindingkey 規(guī)則的隊(duì)列
headers,
consistent_hash,
message_deduplication
};
*@param flags 交換機(jī)標(biāo)志
*@param arguments 其他參數(shù)
*
*此函數(shù)返回一個(gè)延遲處理程序Deferred類??梢栽O(shè)置回調(diào)函數(shù)
using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &declareExchange(
const std::string_view &name,
ExchangeType type,
int flags,
const Table &arguments)
/**
*聲明隊(duì)列
*如果不提供名稱,服務(wù)器將分配一個(gè)名稱。
*flags 可以是以下值的組合:
*
*-durable 持久隊(duì)列在代理重新啟動(dòng)后仍然有效
*-autodelete 當(dāng)所有連接的使用者都離開(kāi)時(shí),自動(dòng)刪除隊(duì)列
*-passive 僅被動(dòng)檢查隊(duì)列是否存在
*-exclusive 隊(duì)列僅存在于此連接,并且在連接斷開(kāi)時(shí)自動(dòng)刪除
*
*@param name 隊(duì)列的名稱
*@param flags 標(biāo)志組合
*@param arguments 可選參數(shù)
*
*此函數(shù)返回一個(gè)延遲處理程序DeferredQueue類??梢栽O(shè)置回調(diào)函數(shù)
*使用 onSuccess()、onError()和 onFinalize()方法。
*
Deferred &onError(const char *message)
*
*可以安裝的 onSuccess()回調(diào)應(yīng)該具有以下簽名:
void myCallback(const std::string &name,
uint32_t messageCount,
uint32_t consumerCount);
例如:
channel.declareQueue("myqueue").onSuccess(
[](const std::string &name,
uint32_t messageCount,
uint32_t consumerCount) {
std::cout << "Queue '" << name << "' ";
std::cout << "has been declared with ";
std::cout << messageCount;
std::cout << " messages and ";
std::cout << consumerCount;
std::cout << " consumers" << std::endl;
* });
*/
DeferredQueue &declareQueue(
const std::string_view &name,
int flags,
const Table &arguments)
/**
*將隊(duì)列綁定到交換機(jī)
*
*@param exchange 源交換機(jī)
*@param queue 目標(biāo)隊(duì)列
*@param routingkey 路由密鑰
*@param arguments 其他綁定參數(shù)
*
*此函數(shù)返回一個(gè)延遲處理程序??梢园惭b回調(diào)
*使用 onSuccess()、onError()和 onFinalize()方法。
*/
Deferred &bindQueue(
const std::string_view &exchange,
const std::string_view &queue,
const std::string_view &routingkey,
const Table &arguments)
/**
*將消息發(fā)布到 exchange
*您必須提供交換機(jī)的名稱和路由密鑰。
然后,RabbitMQ 將嘗試將消息發(fā)送到一個(gè)或多個(gè)隊(duì)列。
使用可選的 flags 參數(shù),可以指定如果消息無(wú)法路由到隊(duì)列時(shí)應(yīng)該發(fā)生的情況。
默認(rèn)情況下,不可更改的消息將被靜默地丟棄。
*
*如果設(shè)置了'mandatory'或'immediate'標(biāo)志,
則無(wú)法處理的消息將返回到應(yīng)用程序。
在開(kāi)始發(fā)布之前,請(qǐng)確保您已經(jīng)調(diào)用了 recall()-方法,
并設(shè)置了所有適當(dāng)?shù)奶幚沓绦騺?lái)處理這些返回的消息。
*
*可以提供以下 flags:
*
*-mandatory 如果設(shè)置,服務(wù)器將返回未發(fā)送到隊(duì)列的消息
*-immediate 如果設(shè)置,服務(wù)器將返回?zé)o法立即轉(zhuǎn)發(fā)給使用者的消息。
*@param exchange 要發(fā)布到的交易所
*@param routingkey 路由密鑰
*@param envelope 要發(fā)送的完整信封
*@param message 要發(fā)送的消息
*@param size 消息的大小
*@param flags 可選標(biāo)志
*/
bool publish(
const std::string_view &exchange,
const std::string_view &routingKey,
const std::string &message,
int flags = 0)
/**
*告訴 RabbitMQ 服務(wù)器我們已準(zhǔn)備好使用消息-也就是訂閱那個(gè)隊(duì)列消息
*
*調(diào)用此方法后,RabbitMQ 開(kāi)始向客戶端應(yīng)用程序傳遞消息。
consumer tag 是一個(gè)字符串標(biāo)識(shí)符,
如果您以后想通過(guò) channel::cancel()調(diào)用停止它,
可以使用它來(lái)標(biāo)識(shí)使用者。
*如果您沒(méi)有指定使用者 tag,服務(wù)器將為您分配一個(gè)。
*
*支持以下 flags:
*
*-nolocal 如果設(shè)置了,則不會(huì)同時(shí)消耗在此通道上發(fā)布的消息
*-noack 如果設(shè)置了,則不必對(duì)已消費(fèi)的消息進(jìn)行確認(rèn)
*-exclusive 請(qǐng)求獨(dú)占訪問(wèn),只有此使用者可以訪問(wèn)隊(duì)列
*
*@param queue 您要使用的隊(duì)列
*@param tag 將與此消費(fèi)操作關(guān)聯(lián)的消費(fèi)者標(biāo)記
*@param flags 其他標(biāo)記
*@param arguments 其他參數(shù)
*
*此函數(shù)返回一個(gè)延遲處理程序。
可以使用 onSuccess()、onError()和 onFinalize()方法安裝回
調(diào)。
可以安裝的 onSuccess()回調(diào)應(yīng)該具有以下格式:
void myCallback(const std::string_view&tag);
樣例:
channel.consume("myqueue").onSuccess(
[](const std::string_view& tag) {
std::cout << "Started consuming under tag ";
std::cout << tag << std::endl;
});
*/
DeferredConsumer &consume(
const std::string_view &queue,
const std::string_view &tag,
int flags,
const Table &arguments)
/**
*確認(rèn)接收到的消息
*
*當(dāng)在 DeferredConsumer::onReceived()方法中接收到消息進(jìn)行處理之后,
必須確認(rèn)該消息,
以便 RabbitMQ 將其從隊(duì)列中刪除(除非使用 noack 選項(xiàng)消費(fèi))。
*
*支持以下標(biāo)志:
*
*-多條確認(rèn)多條消息:之前傳遞的所有未確認(rèn)消息也會(huì)得到確認(rèn)
*
*@param deliveryTag 消息的唯一 delivery 標(biāo)簽
*@param flags 可選標(biāo)志
*@return bool
*/
bool ack(uint64_t deliveryTag, int flags = 0)
}
class DeferredConsumer
{
/*
注冊(cè)一個(gè)回調(diào)函數(shù),該函數(shù)在消費(fèi)者啟動(dòng)時(shí)被調(diào)用。
void onSuccess(const std::string &consumertag)
*/
DeferredConsumer &onSuccess(const ConsumeCallback &callback)
/*
注冊(cè)回調(diào)函數(shù),用于接收到一個(gè)完整消息的時(shí)候被調(diào)用
void MessageCallback(const AMQP::Message &message,
uint64_t deliveryTag, bool redelivered)
*/
DeferredConsumer &onReceived(const MessageCallback &callback)
/* Alias for onReceived() */
DeferredConsumer &onMessage(const MessageCallback &callback)
/*
注冊(cè)要在服務(wù)器取消消費(fèi)者時(shí)調(diào)用的函數(shù)
void CancelCallback(const std::string &tag)
*/
DeferredConsumer &onCancelled(const CancelCallback &callback)
}
class Message : public Envelope
{
const std::string &exchange()
const std::string &routingkey():q
}
class Envelope : public MetaData
{
const char *body()
uint64_t bodySize()
}
}類與接口的介紹總結(jié):
AMQP::Channel:信道類
- Channel(Connection *connection) 構(gòu)造
- bool connected() 判斷連接
- Deferred &declareExchange() 聲明交換機(jī)
- DeferredQueue &declareQueue() 聲明隊(duì)列
- Deferred& bindQueue)() 將交換機(jī)與隊(duì)列進(jìn)行關(guān)系綁定的功能
- bool publish() 發(fā)布消息
- DeferredConsumer&consume() 定訂閱隊(duì)列消息
- bool ack() 消費(fèi)者客戶端對(duì)收到的消息進(jìn)行確認(rèn)應(yīng)答
class Message:消息類
- const char* body() 獲取消息正文
- uint64_t bodySize() 獲取消息正文大小
4.3.2 ev
typedef struct ev_async
{
EV_WATCHER(ev_async)
EV_ATOMIC_T sent; /* private */
} ev_async;
// break type
enum
{
EVBREAK_CANCEL = 0, /* undo unloop */
EVBREAK_ONE = 1, /* unloop once */
EVBREAK_ALL = 2 /* unloop all loops */
};
//實(shí)例化并獲取I/O事件監(jiān)控結(jié)構(gòu)句柄
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0))
#define EV_DEFAULT ev_default_loop(0)(使用宏獲取上面結(jié)構(gòu))
//開(kāi)始運(yùn)行I/O事件監(jiān)控,這是一個(gè)阻塞接口(創(chuàng)建一個(gè)線程執(zhí)行該接口)
int ev_run(struct ev_loop *loop);
/* break out of the loop */
//結(jié)束I/O監(jiān)控
void ev_break(struct ev_loop *loop, int32_t break_type);
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents)
//如果在當(dāng)前線程進(jìn)行ev_run則可以直接調(diào)用,如果在其他線程中進(jìn)行ev_run需要通過(guò)異步通知進(jìn)行
void ev_async_init(ev_async *w, callback cb);//初始化異步事件結(jié)構(gòu),并設(shè)置回調(diào)函數(shù)
void ev_async_start(struct ev_loop *loop, ev_async *w);//啟動(dòng)事件監(jiān)控循環(huán)中的異步任務(wù)處理
void ev_async_send(struct ev_loop *loop, ev_async *w);//發(fā)送當(dāng)前異步事件到異步線程中執(zhí)行第三方庫(kù)鏈接
g++ -o example example.cpp -lamqpcpp -lev
5. RabbitMQ樣例編寫
5.1 發(fā)布消息
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
int main()
{
//1. 實(shí)例化底層網(wǎng)絡(luò)通信框架的I/O事件監(jiān)控句柄
auto *loop = EV_DEFAULT;
//2. 實(shí)例化libEvHandler句柄 --- 將AMQP框架與事件監(jiān)控關(guān)聯(lián)起來(lái)
AMQP::LibEvHandler handler(loop);
//3. 實(shí)例化連接對(duì)象
AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
AMQP::TcpConnection connection(&handler, address);
//4. 實(shí)例化信道對(duì)象
AMQP::TcpChannel channel(&connection);
//5. 聲明交換機(jī)
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
.onError([](const char *message) {
std::cout << "聲明交換機(jī)失?。? << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange 交換機(jī)創(chuàng)建成功!" << std::endl;
});
//6. 聲明隊(duì)列
channel.declareQueue("test-queue")
.onError([](const char *message) {
std::cout << "聲明隊(duì)列失?。? << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-queue 隊(duì)列創(chuàng)建成功!" << std::endl;
});
//7. 針對(duì)交換機(jī)和隊(duì)列進(jìn)行綁定
channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
.onError([](const char *message) {
std::cout << "test-exchange - test-queue 綁定失敗:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange - test-queue 綁定成功!" << std::endl;
});
//8. 向交換機(jī)發(fā)布消息
for (int i = 0; i < 10; i++) {
std::string msg = "Hello Bite-" + std::to_string(i);
bool ret = channel.publish("test-exchange", "test-queue-key", msg);
if (ret == false) {
std::cout << "publish 失??!\n";
}
}
//啟動(dòng)底層網(wǎng)絡(luò)通信框架--開(kāi)啟I/O
ev_run(loop, 0);
return 0;
}5.2 訂閱消息
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
//消息回調(diào)處理函數(shù)的實(shí)現(xiàn)
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
{
std::string msg;
msg.assign(message.body(), message.bodySize());
std::cout << msg << std::endl;
channel->ack(deliveryTag); // 對(duì)消息進(jìn)行確認(rèn)
}
int main()
{
//1. 實(shí)例化底層網(wǎng)絡(luò)通信框架的I/O事件監(jiān)控句柄
auto *loop = EV_DEFAULT;
//2. 實(shí)例化libEvHandler句柄 --- 將AMQP框架與事件監(jiān)控關(guān)聯(lián)起來(lái)
AMQP::LibEvHandler handler(loop);
//2.5. 實(shí)例化連接對(duì)象
AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
AMQP::TcpConnection connection(&handler, address);
//3. 實(shí)例化信道對(duì)象
AMQP::TcpChannel channel(&connection);
//4. 聲明交換機(jī)
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
.onError([](const char *message) {
std::cout << "聲明交換機(jī)失?。? << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange 交換機(jī)創(chuàng)建成功!" << std::endl;
});
//5. 聲明隊(duì)列
channel.declareQueue("test-queue")
.onError([](const char *message) {
std::cout << "聲明隊(duì)列失?。? << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-queue 隊(duì)列創(chuàng)建成功!" << std::endl;
});
//6. 針對(duì)交換機(jī)和隊(duì)列進(jìn)行綁定
channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
.onError([](const char *message) {
std::cout << "test-exchange - test-queue 綁定失敗:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange - test-queue 綁定成功!" << std::endl;
});
//7. 訂閱隊(duì)列消息 -- 設(shè)置消息處理回調(diào)函數(shù)
auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
channel.consume("test-queue", "consume-tag") //返回值 DeferredConsumer
.onReceived(callback)
.onError([](const char *message){
std::cout << "訂閱 test-queue 隊(duì)列消息失敗:" << message << std::endl;
exit(0);
}); // 返回值是 AMQP::Deferred
//8. 啟動(dòng)底層網(wǎng)絡(luò)通信框架--開(kāi)啟I/O
ev_run(loop, 0);
return 0;
}all : publish consume publish : publish.cc g++ -std=c++17 $^ -o $@ -lamqpcpp -lev consume : consume.cc g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
到此這篇關(guān)于C++ RabbitMq消息隊(duì)列組件的文章就介紹到這了,更多相關(guān)C++ RabbitMq消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C語(yǔ)言函數(shù)傳遞數(shù)組和傳遞地址的區(qū)別你知道嗎
這篇文章主要介紹了C語(yǔ)言中數(shù)組作為函數(shù)的參數(shù)以及返回值的使用簡(jiǎn)單入門,這里以一維數(shù)組作為基本條件進(jìn)行例子講解,需要的朋友可以參考下2021-09-09
圖解C++的STL之stack和queue,輕松理解數(shù)據(jù)結(jié)構(gòu)
聚焦?C++?的?STL?中的?stack?和?queue,讓數(shù)據(jù)結(jié)構(gòu)變得簡(jiǎn)單有趣!?通過(guò)圖解的方式,我們將輕松理解這兩個(gè)重要的數(shù)據(jù)結(jié)構(gòu),準(zhǔn)備好開(kāi)啟?STL?學(xué)習(xí)之旅了嗎?讓我們一起探索?stack?和?queue?的奧秘吧!2024-03-03
詳解C語(yǔ)言中的ttyname()函數(shù)和isatty()函數(shù)的用法
這篇文章主要介紹了C語(yǔ)言中的ttyname()函數(shù)和isatty()函數(shù)的用法,是C語(yǔ)言入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-09-09
C語(yǔ)言用fun函數(shù)實(shí)現(xiàn)兩個(gè)數(shù)的交換方式
這篇文章主要介紹了C語(yǔ)言用fun函數(shù)實(shí)現(xiàn)兩個(gè)數(shù)的交換方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12

