C++ 第三方庫 RabbitMq示例詳解
1.介紹
RabbitMQ:消息隊列組件,實現(xiàn)兩個客戶端主機(jī)之間消息傳輸?shù)墓δ?發(fā)布&訂閱)
- 核心概念:交換機(jī)、隊列、綁定、消息
- 交換機(jī)類型:
- 廣播交換:當(dāng)交換機(jī)收到消息,則將消息發(fā)布到所有綁定的隊列中
- 直接交換:根據(jù)消息中的
bkey與綁定的rkey對比,一致則放入隊列 - 主題交換:使用
bkey與綁定的rkey進(jìn)行規(guī)則匹配,成功則放入隊列
2.安裝
1.RabbitMq
- 安裝:
sudo apt install rabbitmq-server - 簡單使用:
# 安裝完成的時候默認(rèn)有個用戶guest,但是權(quán)限不夠,要創(chuàng)建一個administrator用戶,才可以做為遠(yuǎn)程登錄和發(fā)表訂閱消息 #添加用戶 sudo rabbitmqctl add_user root <PASSWORD> #設(shè)置用戶tag sudo rabbitmqctl set_user_tags root administrator #設(shè)置用戶權(quán)限 sudo rabbitmqctl set_permissions -p / root "." "." ".*" # RabbitMQ自帶了web管理界面, 執(zhí)行下面命令開啟, 默認(rèn)端口15672 sudo rabbitmq-plugins enable rabbitmq_management
2.客戶端庫
sudo apt install libev-dev #libev 網(wǎng)絡(luò)庫組件 git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git cd AMQP-CPP/ make make install
如果安裝時出現(xiàn)以下報錯,則表示ssl版本出現(xiàn)問題
/usr/include/openssl/macros.h:147:4: error: #error
"OPENSSL_API_COMPAT expresses an impossible API compatibility
level"
147 | # error "OPENSSL_API_COMPAT expresses an impossible API
compatibility level"
| ^~~~~
In file included from /usr/include/openssl/ssl.h:18,
from linux_tcp/openssl.h:20,
from linux_tcp/openssl.cpp:12:
/usr/include/openssl/bio.h:687:1: error: expected constructor,
destructor, or type conversion before ‘DEPRECATEDIN_1_1_0'
687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str,
unsigned short *port_ptr))解決方案:卸載當(dāng)前的ssl庫,重新進(jìn)行修復(fù)安裝
dpkg -l | grep ssl sudo dpkg -P --force-all libevent-openssl-2.1-7 sudo dpkg -P --force-all openssl sudo dpkg -P --force-all libssl-dev sudo apt --fix-broken install
3.AMQP-CPP
簡單使用
1.介紹
AMQP-CPP是用于與RabbitMq消息中間件通信的C++庫- 它能解析從
RabbitMq服務(wù)發(fā)送來的數(shù)據(jù),也可以生成發(fā)向RabbitMq的數(shù)據(jù)包 AMQP-CPP庫不會向RabbitMq建立網(wǎng)絡(luò)連接,所有的網(wǎng)絡(luò)IO由用戶完成
- 它能解析從
AMQP-CPP提供了可選的網(wǎng)絡(luò)層接口,它預(yù)定義了TCP模塊,用戶就不用自己實現(xiàn)網(wǎng)絡(luò)IO,- 也可以選擇
libevent、libev、libuv、asio等異步通信組件, 需要手動安裝對應(yīng)的組件
- 也可以選擇
AMQP-CPP完全異步,沒有阻塞式的系統(tǒng)調(diào)用,不使用線程就能夠應(yīng)用在高性能應(yīng)用中- 注意:它需要C++17的支持
2.使用
AMQP-CPP的使用有兩種模式:- 使用默認(rèn)的
TCP模塊進(jìn)行網(wǎng)絡(luò)通信 - 使用擴(kuò)展的
libevent、libev、libuv、asio異步通信組件進(jìn)行通信
- 使用默認(rèn)的
- 此處以
libev為例,不需要自己實現(xiàn)monitor函數(shù),可以直接使用AMQP::LibEvHandler
4.類與接口
1.Channel
channel是一個虛擬連接,一個連接上可以建立多個通道- 并且所有的
RabbitMq指令都是通過channel傳輸
- 并且所有的
- 所以連接建立后的第一步,就是建立
channel- 因為所有操作是異步的,所以在
channel上執(zhí)行指令的返回值并不能作為操作執(zhí)行結(jié)果
- 因為所有操作是異步的,所以在
- 實際上它返回的是
Deferred類,可以使用它安裝處理函數(shù)
namespace AMQP
{
/**
* Generic callbacks that are used by many deferred objects
*/
using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;
/**
* Declaring and deleting a queue
*/
using QueueCallback = std::function<void(const std::string &name,
uint32_t messagecount,
uint32_t consumercount)>;
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
using MessageCallback = std::function<void(const Message &message,
uint64_t deliveryTag,
bool redelivered)>;
// 當(dāng)使用發(fā)布者確認(rèn)時,當(dāng)服務(wù)器確認(rèn)消息已被接收和處理時,將調(diào)用AckCallback
using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
// 使用確認(rèn)包裹通道時,當(dāng)消息被ack/nacked時,會調(diào)用這些回調(diào)
using PublishAckCallback = std::function<void()>;
using PublishNackCallback = std::function<void()>;
using PublishLostCallback = std::function<void()>;
// 信道類
class Channel
{
Channel(Connection *connection);
bool connected();
/**
*聲明交換機(jī)
*如果提供了一個空名稱,則服務(wù)器將分配一個名稱。
*以下flags可用于交換機(jī):
*
*-durable 持久化,重啟后交換機(jī)依然有效
*-autodelete 刪除所有連接的隊列后,自動刪除交換
*-passive 僅被動檢查交換機(jī)是否存在
*-internal 創(chuàng)建內(nèi)部交換
*
*@param name 交換機(jī)的名稱
*@param-type 交換類型
enum ExchangeType
{
fanout, 廣播交換,綁定的隊列都能拿到消息
direct, 直接交換,只將消息交給routingkey一致的隊列
topic, 主題交換,將消息交給符合bindingkey規(guī)則的隊列
headers,
consistent_hash,
message_deduplication
};
*@param flags 交換機(jī)標(biāo)志
*@param arguments其他參數(shù)
*
*此函數(shù)返回一個延遲處理程序??梢园惭b回調(diào)
using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &declareExchange(const std::string_view &name,
ExchangeType type,
int flags,
const Table &arguments);
/**
*聲明隊列
*如果不提供名稱,服務(wù)器將分配一個名稱。
*flags可以是以下值的組合:
*
*-durable 持久隊列在代理重新啟動后仍然有效
*-autodelete 當(dāng)所有連接的使用者都離開時,自動刪除隊列
*-passive 僅被動檢查隊列是否存在
*-exclusive 隊列僅存在于此連接,并且在連接斷開時自動刪除
*
*@param name 隊列的名稱
*@param flags 標(biāo)志組合
*@param arguments 可選參數(shù)
*
*此函數(shù)返回一個延遲處理程序。可以安裝回調(diào)
*使用onSuccess()、onError()和onFinalize()方法。
*
Deferred &onError(const char *message)
*
*可以安裝的onSuccess()回調(diào)應(yīng)該具有以下簽名:
void myCallback(const std::string &name,
uint32_t messageCount,
uint32_t consumerCount);
例如:
channel.declareQueue("myqueue").onSuccess(
[](const std::string &name,
uint32_t messageCount,
uint32_t consumerCount) {
std::cout << "Queue '" << name << "' ";
std::cout << "has been declared with ";
std::cout << messageCount;
std::cout << " messages and ";
std::cout << consumerCount;
std::cout << " consumers" << std::endl;
* });
*/
DeferredQueue &declareQueue(const std::string_view &name,
int flags,
const Table &arguments);
/**
*將隊列綁定到交換機(jī)
*
*@param exchange 源交換機(jī)
*@param queue 目標(biāo)隊列
*@param routingkey 路由密鑰
*@param arguments 其他綁定參數(shù)
*
*此函數(shù)返回一個延遲處理程序??梢园惭b回調(diào)
*使用onSuccess()、onError()和onFinalize()方法。
*/
Deferred &bindQueue(const std::string_view &exchange,
const std::string_view &queue,
const std::string_view &routingkey,
const Table &arguments);
/**
*將消息發(fā)布到exchange
*您必須提供交換機(jī)的名稱和路由密鑰。
然后,RabbitMQ將嘗試將消息發(fā)送到一個或多個隊列。
使用可選的flags參數(shù),可以指定如果消息無法路由到隊列時應(yīng)該發(fā)生的情況。
默認(rèn)情況下,不可更改的消息將被靜默地丟棄。
*
*如果設(shè)置了'mandatory'或'immediate'標(biāo)志,
則無法處理的消息將返回到應(yīng)用程序。
在開始發(fā)布之前,請確保您已經(jīng)調(diào)用了recall()-方法,
并設(shè)置了所有適當(dāng)?shù)奶幚沓绦騺硖幚磉@些返回的消息。
*
*可以提供以下flags:
*
*-mandatory 如果設(shè)置,服務(wù)器將返回未發(fā)送到隊列的消息
*-immediate 如果設(shè)置,服務(wù)器將返回?zé)o法立即轉(zhuǎn)發(fā)給使用者的消息。
*@param exchange要發(fā)布到的交易所
*@param routingkey路由密鑰
*@param envelope要發(fā)送的完整信封
*@param message要發(fā)送的消息
*@param size消息的大小
*@param flags可選標(biāo)志
*/
bool publish(const std::string_view &exchange,
const std::string_view &routingKey,
const std::string &message,
int flags = 0);
/**
*告訴RabbitMQ服務(wù)器已準(zhǔn)備好使用消息-也就是 訂閱隊列消息
*
*調(diào)用此方法后,RabbitMQ開始向客戶端應(yīng)用程序傳遞消息。
consumer tag是一個字符串標(biāo)識符,
如果您以后想通過channel::cancel()調(diào)用停止它,
可以使用它來標(biāo)識使用者。
*如果您沒有指定使用者tag,服務(wù)器將為您分配一個。
*
*支持以下flags:
*
*-nolocal 如果設(shè)置了,則不會同時消耗在此通道上發(fā)布的消息
*-noack 如果設(shè)置了,則不必對已消費的消息進(jìn)行確認(rèn)
*-exclusive 請求獨占訪問,只有此使用者可以訪問隊列
*
*@param queue 您要使用的隊列
*@param tag 將與此消費操作關(guān)聯(lián)的消費者標(biāo)記
*@param flags 其他標(biāo)記
*@param arguments其他參數(shù)
*
*此函數(shù)返回一個延遲處理程序。
可以使用onSuccess()、onError()和onFinalize()方法安裝回調(diào)
可以安裝的onSuccess()回調(diào)應(yīng)該具有以下格式:
void myCallback(const std::string_view&tag);
樣例:
channel.consume("myqueue").onSuccess(
[](const std::string_view& tag) {
std::cout << "Started consuming under tag ";
std::cout << tag << std::endl;
});
*/
DeferredConsumer &consume(const std::string_view &queue,
const std::string_view &tag,
int flags,
const Table &arguments);
/**
*確認(rèn)接收到的消息
*
*消費者客戶端對收到的消息進(jìn)行確認(rèn)應(yīng)答
*
*當(dāng)在DeferredConsumer::onReceived()方法中接收到消息時,
必須確認(rèn)該消息,
以便RabbitMQ將其從隊列中刪除(除非使用noack選項消費)
*
*支持以下標(biāo)志:
*
*-多條確認(rèn)多條消息:之前傳遞的所有未確認(rèn)消息也會得到確認(rèn)
*
*@param deliveryTag 消息的唯一delivery標(biāo)簽
*@param flags 可選標(biāo)志
*@return bool
*/
bool ack(uint64_t deliveryTag, int flags=0);
};
class DeferredConsumer
{
/*
注冊一個回調(diào)函數(shù),該函數(shù)在消費者啟動時被調(diào)用
void onSuccess(const std::string &consumertag)
*/
DeferredConsumer &onSuccess(const ConsumeCallback& callback);
/*
注冊回調(diào)函數(shù),用于接收到一個完整消息的時候被調(diào)用
void MessageCallback(const AMQP::Message &message,
uint64_t deliveryTag, bool redelivered)
*/
DeferredConsumer &onReceived(const MessageCallback& callback);
/* Alias for onReceived() */
DeferredConsumer &onMessage(const MessageCallback& callback);
/*
注冊要在服務(wù)器取消消費者時調(diào)用的函數(shù)
void CancelCallback(const std::string &tag)
*/
DeferredConsumer &onCancelled(const CancelCallback& callback);
};
class Message : public Envelope
{
const std::string &exchange();
const std::string &routingkey();
};
class Envelope : public MetaData
{
const char *body(); // 獲取消息正文
uint64_t bodySize(); // 獲取消息正文大小
};
}2.ev
typedef struct ev_async
{
EV_WATCHER (ev_async);
EV_ATOMIC_T sent; /* private */
}ev_async;
//break type
enum
{
EVBREAK_CANCEL = 0, /* undo unloop */
EVBREAK_ONE = 1, /* unloop once */
EVBREAK_ALL = 2 /* unloop all loops */
};
// 實例化并獲取IO事件監(jiān)控接口句柄
struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0));
# define EV_DEFAULT ev_default_loop (0)
// 開始運(yùn)行IO事件監(jiān)控, 這是一個阻塞接口
int ev_run (struct ev_loop *loop);
/* break out of the loop */
// 結(jié)束IO監(jiān)控
// 如果在主線程進(jìn)行ev_run(), 則可以直接調(diào)用,
// 如果在其他線程中進(jìn)行ev_run(), 需要通過異步通知進(jìn)行
void ev_break (struct ev_loop *loop, int32_t break_type) ;
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
// 初始化異步事件結(jié)構(gòu), 并設(shè)置回調(diào)函數(shù)
void ev_async_init(ev_async *w, callback cb);
// 啟動事件監(jiān)控循環(huán)中的異步任務(wù)處理
void ev_async_start(struct ev_loop *loop, ev_async *w);
// 發(fā)送當(dāng)前異步事件到異步線程中執(zhí)行
void ev_async_send(struct ev_loop *loop, ev_async *w);5.使用
1.publish.cc
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
int main()
{
// 1.實例化底層網(wǎng)絡(luò)通信框架的IO事件監(jiān)控句柄
auto *loop = EV_DEFAULT;
// 2.實例化libEvHandler句柄 -> 將AMQP框架與事件監(jiān)控關(guān)聯(lián)起來
AMQP::LibEvHandler handler(loop);
// 3.實例化連接對象
AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/");
AMQP::TcpConnection connection(&handler, address);
// 4.實例化信道對象
AMQP::TcpChannel channel(&connection);
// 5.聲明交換機(jī)
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
.onError([](const char *message)
{ std::cout << "聲明交換機(jī)失敗: " << message << std::endl; })
.onSuccess([]()
{ std::cout << "test-exchange 交換機(jī)創(chuàng)建成功" << std::endl; });
// 6.聲明隊列
channel.declareQueue("test-queue")
.onError([](const char *message)
{ std::cout << "聲明隊列失敗: " << message << std::endl; })
.onSuccess([]()
{ std::cout << "test-queue 隊列創(chuàng)建成功" << std::endl; });
// 7.針對交換機(jī)和隊列進(jìn)行綁定
channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
.onError([](const char *message)
{ std::cout << "test-exchange - test-queue 綁定失敗: " \
<< message << std::endl; })
.onSuccess([]()
{ std::cout << "test-exchange - test-queue 綁定成功"
<< std::endl; });
// 8.向交換機(jī)發(fā)布消息
for (int i = 0; i < 5; ++i)
{
std::string msg = "Hello SnowK-" + std::to_string(i);
if(channel.publish("test-exchange", "test-queue-key", msg) == false)
{
std::cout << "publish 失敗" << std::endl;
}
}
// 9.啟動底層網(wǎng)絡(luò)通信框架 -> 開啟IO
ev_run(loop, 0);
return 0;
}2.consume.cc
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
void MessageCB(AMQP::TcpChannel* channel, const AMQP::Message& message,
uint64_t deliveryTag, bool redelivered)
{
std::string msg;
msg.assign(message.body(), message.bodySize());
// 不能這樣使用, AMQP::Message后面沒有存'\0'
// std::cout << message << std::endl
std::cout << msg << std::endl;
channel->ack(deliveryTag);
}
int main()
{
// 1.實例化底層網(wǎng)絡(luò)通信框架的IO事件監(jiān)控句柄
auto *loop = EV_DEFAULT;
// 2.實例化libEvHandler句柄 -> 將AMQP框架與事件監(jiān)控關(guān)聯(lián)起來
AMQP::LibEvHandler handler(loop);
// 3.實例化連接對象
AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/");
AMQP::TcpConnection connection(&handler, address);
// 4.實例化信道對象
AMQP::TcpChannel channel(&connection);
// 5.聲明交換機(jī)
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
.onError([](const char *message)
{ std::cout << "聲明交換機(jī)失敗: " << message << std::endl; })
.onSuccess([]()
{ std::cout << "test-exchange 交換機(jī)創(chuàng)建成功" << std::endl; });
// 6.聲明隊列
channel.declareQueue("test-queue")
.onError([](const char *message)
{ std::cout << "聲明隊列失敗: " << message << std::endl; })
.onSuccess([]()
{ std::cout << "test-queue 隊列創(chuàng)建成功" << std::endl; });
// 7.針對交換機(jī)和隊列進(jìn)行綁定
channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
.onError([](const char *message)
{ std::cout << "test-exchange - test-queue 綁定失敗: " \
<< message << std::endl; })
.onSuccess([]()
{ std::cout << "test-exchange - test-queue 綁定成功"; });
// 8.訂閱消息對壘 -> 設(shè)置消息處理回調(diào)函數(shù)
auto callback = std::bind(MessageCB, &channel, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3);
channel.consume("test-queue", "consume-tag")
.onReceived(callback)
.onError([](const char *message)
{
std::cout << "訂閱 test-queue 隊列消息失敗: " << message << std::endl;
exit(0);
});
// 9.啟動底層網(wǎng)絡(luò)通信框架 -> 開啟IO
ev_run(loop, 0);
return 0;
}3.makefile
all: publish consume publish: publish.cc g++ -o $@ $^ -lamqpcpp -lev -std=c++17 consume: consume.cc g++ -o $@ $^ -lamqpcpp -lev -std=c++17 .PHONY:clean clean: rm publish consume
到此這篇關(guān)于C++ 第三方庫 RabbitMq詳細(xì)講解的文章就介紹到這了,更多相關(guān)C++ 第三方庫 RabbitMq內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C++類中的常數(shù)據(jù)成員與靜態(tài)數(shù)據(jù)成員之間的區(qū)別
常數(shù)據(jù)成員是指在類中定義的不能修改其值的一些數(shù)據(jù)成員,類似于我們以前學(xué)過的常變量,雖然是變量,也有自己的地址,但是一經(jīng)賦初值,便不能再被修改2013-10-10
C語言實現(xiàn)簡易通訊錄(靜態(tài)版本)的代碼分享
這篇文章主要為大家詳細(xì)介紹了如何錄音C語言實現(xiàn)一個簡易的通訊錄(靜態(tài)版本),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-10-10
vscode使用cmake時將命令行參數(shù)傳遞給調(diào)試目標(biāo)的方法
這篇文章主要介紹了vscode使用cmake時將命令行參數(shù)傳遞給調(diào)試目標(biāo),下面介紹了一個示例,將參數(shù)first_arg, second-arg和third arg傳遞給程序(此處需要注意,third arg中間雖然存在空格,但是仍然被視作一個參數(shù)),需要的朋友參考下吧2024-03-03

