C++簡單實現(xiàn)RPC網(wǎng)絡通訊的示例詳解
RPC是遠程調(diào)用系統(tǒng)簡稱,它允許程序調(diào)用運行在另一臺計算機上的過程,就像調(diào)用本地的過程一樣。RPC 實現(xiàn)了網(wǎng)絡編程的“過程調(diào)用”模型,讓程序員可以像調(diào)用本地函數(shù)一樣調(diào)用遠程函數(shù)。最近在做的也是遠程調(diào)用過程,所以通過重新梳理RPC來整理總結一下。
項目來源:
GitHub - qicosmos/rest_rpc: modern C++(C++11), simple, easy to use rpc framework
一、RPC簡介
1.1 簡介
RPC指的是計算機A的進程調(diào)用另外一臺計算機B的進程,A上的進程被掛起,B上被調(diào)用的進程開始執(zhí)行,當B執(zhí)行完畢后將執(zhí)行結果返回給A,A的進程繼續(xù)執(zhí)行。調(diào)用方可以通過使用參數(shù)將信息傳送給被調(diào)用方,然后通過傳回的結果得到信息。這些傳遞的信息都是被加密過或者其他方式處理。這個過程對開發(fā)人員是透明的,因此RPC可以看作是本地過程調(diào)用的一種擴展,使被調(diào)用過程不必與調(diào)用過程位于同一物理機中。

RPC可以用于構建基于B/S模式的分布式應用程序:請求服務是一個客戶端、而服務提供程序是一臺服務器。和常規(guī)和本地的調(diào)用過程一樣,遠程過程調(diào)用是同步操作,在結果返回之前,需要暫時中止請求程序。
RPC的優(yōu)點:
- 支持面向過程和面向線程的模型;
- 內(nèi)部消息傳遞機制對用戶隱藏;
- 基于 RPC 模式的開發(fā)可以減少代碼重寫;
- 可以在本地環(huán)境和分布式環(huán)境中運行;
1.2 本地調(diào)用和遠程調(diào)用的區(qū)別
以ARM環(huán)境為例,我們拆解本地調(diào)用的過程,以下面代碼為例:
int selfIncrement(int a)
{
return a + 1;
}
int a = 10;當執(zhí)行到selfIncrement(a)時,首先把a存入寄存器R0,之后轉(zhuǎn)到函數(shù)地址selfIncrement,執(zhí)行函數(shù)內(nèi)的指令 ADD R0,#1。跳轉(zhuǎn)到函數(shù)的地址偏移量在編譯時確定。
但是如果這是一個遠程調(diào)用,selfIncrement函數(shù)存在于其他機器,為了實現(xiàn)遠程調(diào)用,請求方和服務方需要提供需要解決以下問題:
1. 網(wǎng)絡傳輸。
本地調(diào)用的參數(shù)存放在寄存器或棧中,在同一塊內(nèi)存中,可以直接訪問到。遠程過程調(diào)用需要借助網(wǎng)絡來傳遞參數(shù)和需要調(diào)用的函數(shù) ID。
2. 編解碼
請求方需要將參數(shù)轉(zhuǎn)化為字節(jié)流,服務提供方需要將字節(jié)流轉(zhuǎn)化為參數(shù)。
3. 函數(shù)映射表
服務提供方的函數(shù)需要有唯一的 ID 標識,請求方通過 ID 標識告知服務提供方需要調(diào)用哪個函數(shù)。
以上三個功能即為 RPC 的基本框架所必須包含的功能。
1.3 RPC運行的流程
一次 RPC 調(diào)用的運行流程大致分為如下七步,具體如下圖所示。
1.客戶端調(diào)用客戶端存根程序,將參數(shù)傳入;
2.客戶端存根程序?qū)?shù)轉(zhuǎn)化為標準格式,并編組進消息;
3.客戶端存根程序?qū)⑾l(fā)送到傳輸層,傳輸層將消息傳送至遠程服務器;
4.服務器的傳輸層將消息傳遞到服務器存根程序,存根程序?qū)﹃U述進行解包,并使用本地調(diào)用的機制調(diào)用所需的函數(shù);
5.運算完成之后,將結果返回給服務器存根,存根將結果編組為消息,之后發(fā)送給傳輸層;
6.服務器傳輸層將結果消息發(fā)送給客戶端傳輸層;
7.客戶端存根對返回消息解包,并返回給調(diào)用方。

服務端存根和客戶端存根可以看做是被封裝起來的細節(jié),這些細節(jié)對于開發(fā)人員來說是透明的,但是在客戶端層面看到的是 “本地” 調(diào)用了 selfIncrement() 方法,在服務端層面,則需要封裝、網(wǎng)絡傳輸、解封裝等等操作。因此 RPC 可以看作是傳統(tǒng)本地過程調(diào)用的一種擴展,其使得被調(diào)用過程不必與調(diào)用過程位于同一物理機中。
1.4 小結
RPC 的目標是做到在遠程機器上調(diào)用函數(shù)與本地調(diào)用函數(shù)一樣的體驗。 為了達到這個目的,需要實現(xiàn)網(wǎng)絡傳輸、序列化與反序列化、函數(shù)映射表等功能,其中網(wǎng)絡傳輸可以使用socket或其他,序列化和反序列化可以使用protobuf,函數(shù)映射表可以使用std::function。
lambda與std::function內(nèi)容可以看:
lambda 表達式和 std::function 的功能是類似的,lambda 表達式可以轉(zhuǎn)換為 std::function,一般情況下,更多使用 lambda 表達式,只有在需要回調(diào)函數(shù)的情況下才會使用 std::function。
二、RPC簡單實現(xiàn)
2.1 客戶端實現(xiàn)代碼
#include <iostream>
#include <memory>
#include <thread>
#include <functional>
#include <cstring>
class RPCClient
{
public:
using RPCCallback = std::function<void(const std::string&)>;
RPCClient(const std::string& server_address) : server_address_(server_address) {}
~RPCClient() {}
void Call(const std::string& method, const std::string& request, RPCCallback callback)
{
// 序列化請求數(shù)據(jù)
std::string data = Serialize(method, request);
// 發(fā)送請求
SendRequest(data);
// 開啟線程接收響應
std::thread t([this, callback]() {
std::string response = RecvResponse();
// 反序列化響應數(shù)據(jù)
std::string result = Deserialize(response);
callback(result);
});
t.detach();
}
private:
std::string Serialize(const std::string& method, const std::string& request)
{
// 省略序列化實現(xiàn)
}
void SendRequest(const std::string& data)
{
// 省略網(wǎng)絡發(fā)送實現(xiàn)
}
std::string RecvResponse()
{
// 省略網(wǎng)絡接收實現(xiàn)
}
std::string Deserialize(const std::string& response)
{
// 省略反序列化實現(xiàn)
}
private:
std::string server_address_;
};
int main()
{
std::shared_ptr<RPCClient> client(new RPCClient("127.0.0.1:8000"));
client->Call("Add", "1,2", [](const std::string& result) {
std::cout << "Result: " << result << std::endl;
});
return 0;
}這段代碼定義了RPCClient類來處理客戶端的請求任務,用到了lambda和std::function來處理函數(shù)調(diào)用,在Call中使用多線程技術。main中使用智能指針管理Rpcclient類,并調(diào)用了客戶端的Add函數(shù)。
127.0.0.1為本地地址,對開發(fā)來說需要使用本地地址自測,端口號為8000,需要選擇一個空閑端口來通信。
2.2 服務端代碼
下面是服務端的實現(xiàn)
#include <iostream>
#include <map>
#include <functional>
#include <memory>
#include <thread>
#include <mutex>
// 使用第三方庫實現(xiàn)序列化和反序列化
#include <boost/serialization/serialization.hpp>
#include <boost/serialization/map.hpp>
using namespace std;
// 定義RPC函數(shù)類型
using RPCCallback = std::function<std::string(const std::string&)>;
class RPCHandler {
public:
void registerCallback(const std::string& name, RPCCallback callback) {
std::unique_lock<std::mutex> lock(mtx_);
callbacks_[name] = callback;
}
std::string handleRequest(const std::string& request) {
// 反序列化請求
std::map<std::string, std::string> requestMap;
std::istringstream is(request);
boost::archive::text_iarchive ia(is);
ia >> requestMap;
// 查找并調(diào)用對應的回調(diào)函數(shù)
std::string name = requestMap["name"];
std::string args = requestMap["args"];
std::unique_lock<std::mutex> lock(mtx_);
auto it = callbacks_.find(name);
if (it == callbacks_.end()) {
return "Error: Unknown function";
}
RPCCallback callback = it->second;
return callback(args);
}
private:
std::map<std::string, RPCCallback> callbacks_;
std::mutex mtx_;
};
int main() {
RPCHandler rpcHandler;
// 注冊回調(diào)函數(shù)
rpcHandler.registerCallback("add", [](const std::string& args) {
std::istringstream is(args);
int a, b;
is >> a >> b;
int result = a + b;
std::ostringstream os;
os << result;
return os.str();
});
rpcHandler.registerCallback("sub", [](const std::string& args) {
std::istringstream is(args);
int a, b;
is >> a >> b;
int result = a - b;
std::ostringstream os;
os << result;
return os.str
});
// 創(chuàng)建處理請求的線程
std::thread requestThread([&]() {
while (true) {
std::string request;
std::cin >> request;
std::string response = rpcHandler.handleRequest(request);
std::cout << response << std::endl;
}
});
requestThread.join();
return 0;
}上面的代碼實現(xiàn)了一個簡單的C++ RPC服務端。主要實現(xiàn)了以下功能:
1.定義了RPC函數(shù)類型 RPCCallback,使用std::function<std::string(const std::string&)>表示。
2.RPCHandler類實現(xiàn)了注冊函數(shù)和處理請求的功能。
3.在main函數(shù)中創(chuàng)建了一個RPCHandler對象,并注冊了兩個函數(shù)"add" 和 "sub"。這些函數(shù)通過lambda表達式實現(xiàn),并在被調(diào)用時通過std::istringstream讀取參數(shù)并返回結果。
4.創(chuàng)建了一個新線程requestThread來處理請求。在這個線程中,通過std::cin讀取請求,然后調(diào)用RPCHandler的handleRequest函數(shù)并使用std::cout輸出響應。

注意,這套代碼是最簡單的RPC機制,只能調(diào)用本地的資源,他還存在以下缺點:
1.代碼并沒有處理錯誤處理,如果請求格式不正確或函數(shù)不存在,服務端將會返回“Error: Unknown function”。
2.沒有使用網(wǎng)絡庫進行通信,所以只能在本機上使用。
3.沒有提供高效的并發(fā)性能,所有請求都在單獨的線程中處理。
4.沒有考慮RPC服務的可用性和高可用性,如果服務端崩潰或不可用,客戶端將無法繼續(xù)使用服務。
5.沒有考慮RPC服務的可擴展性,如果有大量請求需要處理,可能會導致性能問題。
6.使用了第三方庫Boost.Serialization來實現(xiàn)序列化和反序列化,如果不想使用第三方庫,可能需要自己實現(xiàn)序列化的功能。
下面我們一步一步完善它。
三、加強版RPC(以“RPC簡單實現(xiàn)”為基礎)
3.1 加入錯誤處理
下面是 RPCHandler 類中加入錯誤處理的代碼示例:
class RPCHandler {
public:
// 其他代碼...
std::string handleRequest(const std::string& request) {
// 反序列化請求
std::map<std::string, std::string> requestMap;
std::istringstream is(request);
boost::archive::text_iarchive ia(is);
ia >> requestMap;
// 查找并調(diào)用對應的回調(diào)函數(shù)
std::string name = requestMap["name"];
std::string args = requestMap["args"];
std::unique_lock<std::mutex> lock(mtx_);
auto it = callbacks_.find(name);
if (it == callbacks_.end()) {
return "Error: Unknown function";
}
RPCCallback callback = it->second;
try {
return callback(args);
} catch (const std::exception& e) {
return "Error: Exception occurred: " + std::string(e.what());
} catch (...) {
return "Error: Unknown exception occurred";
}
}
};上面的代碼在 RPCHandler 類的 handleRequest 函數(shù)中加入了錯誤處理的代碼,它使用了 try-catch 語句來捕獲可能發(fā)生的異常。如果找不到對應的函數(shù)或發(fā)生了異常,會返回錯誤信息。這樣,如果請求格式不正確或函數(shù)不存在,服務端將會返回相應的錯誤信息。
3.2 加入網(wǎng)絡連接(socket)
加入網(wǎng)絡連接不需要動服務端的實現(xiàn),只需要在main里創(chuàng)造套接字去鏈接就好:
int main()
{
io_context ioc;
ip::tcp::acceptor acceptor(ioc, ip::tcp::endpoint(ip::tcp::v4(), 8080));
RPCHandler rpcHandler;
// 注冊函數(shù)
rpcHandler.registerCallback("add", [](const std::string& args) {
std::istringstream is(args);
int a, b;
is >> a >> b;
int result = a + b;
std::ostringstream os;
os << result;
return os.str();
});
rpcHandler.registerCallback("sub", [](const std::string& args) {
std::istringstream is(args);
int a, b;
is >> a >> b;
int result = a - b;
std::ostringstream os;
os << result;
return os.str();
});
// 等待連接
while (true) {
ip::tcp::socket socket(ioc);
acceptor.accept(socket);
// 創(chuàng)建線程處理請求
std::thread requestThread([&](ip::tcp::socket socket) {
while (true) {
// 讀取請求
boost::asio::streambuf buf;
read_until(socket, buf, '\n');
std::string request = boost::asio::buffer_cast<const char*>(buf.data());
request.pop_back();
// 處理請求
std::string response = rpcHandler.handleRequest(request);
// 發(fā)送響應
write(socket, buffer(response + '\n'));
}
}, std::move(socket));
requestThread.detach();
}
return 0;
}這是一個使用Boost.Asio庫實現(xiàn)的RPC服務端代碼示例。它使用了TCP協(xié)議監(jiān)聽8080端口,等待客戶端的連接。當有客戶端連接時,創(chuàng)建一個新線程來處理請求。請求和響應通過網(wǎng)絡傳輸。
3.3 加強并發(fā)性
使用并發(fā)和異步機制,忽略重復代碼,實現(xiàn)如下:
class RPCHandler {
public:
// ...
void handleConnection(ip::tcp::socket socket) {
while (true) {
// 讀取請求
boost::asio::streambuf buf;
read_until(socket, buf, '\n');
std::string request = boost::asio::buffer_cast<const char*>(buf.data());
request.pop_back();
// 使用并行執(zhí)行處理請求
std::vector<std::future<std::string>> futures;
for (int i = 0; i < request.size(); i++) {
futures.emplace_back(std::async(std::launch::async, &RPCHandler::handleRequest, this, request[i]));
}
// 等待所有請求處理完成并發(fā)送響應
for (auto& f : futures) {
std::string response = f.get();
write(socket, buffer(response + '\n'));
}
}
}
};這樣,請求會被分成多個部分并行處理,可以利用多核 CPU 的優(yōu)勢提高服務端的并發(fā)性能。
main():
int main() {
io_context ioc;
ip::tcp::acceptor acceptor(ioc, ip::tcp::endpoint(ip::tcp::v4(), 8080));
RPCHandler rpcHandler;
// 注冊函數(shù)
rpcHandler.registerCallback("add", [](const std::string& args) {
std::istringstream is(args);
int a, b;
is >> a >> b;
int result = a + b;
std::ostringstream os;
os << result;
return os.str();
});
rpcHandler.registerCallback("sub", [](const std::string& args) {
std::istringstream is(args);
int a, b;
is >> a >> b;
int result = a - b;
std::ostringstream os;
os << result;
return os.str();
});
// 創(chuàng)建線程池
boost::thread_pool::executor pool(10);
// 等待連接
while (true) {
ip::tcp::socket socket(ioc);
acceptor.accept(socket);
// 將請求添加到線程池中處理
pool.submit(boost::bind(&RPCHandler::handleConnection, &rpcHandler, std::move(socket)));
}
return 0;
}在 main 函數(shù)中可以使用 boost::thread_pool::executor 來管理線程池,在線程池中提交任務來處理請求。這里的線程池大小設置為10,可以根據(jù)實際情況調(diào)整。
3.4 加入容錯機制(修改客戶端部分)
在其中使用了重試機制來保證客戶端能夠重新連接服務端:
class RPCClient {
public:
RPCClient(const std::string& address, int port) : address_(address), port_(port), socket_(io_context_) {
connect();
}
std::string call(const std::string& name, const std::string& args) {
// 序列化請求
std::ostringstream os;
boost::archive::text_oarchive oa(os);
std::map<std::string, std::string> request;
request["name"] = name;
request["args"] = args;
oa << request;
std::string requestStr = os.str();
// 發(fā)送請求
write(socket_, buffer(requestStr + '\n'));
// 讀取響應
boost::asio::streambuf buf;
read_until(socket_, buf, '\n');
std::string response = boost::asio::buffer_cast<const char*>(buf.data());
response.pop_back();
return response;
}
private:
void connect() {
bool connected = false;
while (!connected) {
try {
socket_.connect(ip::tcp::endpoint(ip::address::from_string(address_), port_));
connected = true;
} catch (const std::exception& e) {
std::cerr << "Error connecting to server: " << e.what() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}
std::string address_;
int port_;
io_context io_context_;
ip::tcp::socket socket_;
};在這個示例中,當連接服務端失敗時,客戶端會在一定的時間間隔后重試連接,直到成功連接上服務端為止。
以上就是C++簡單實現(xiàn)RPC網(wǎng)絡通訊的示例詳解的詳細內(nèi)容,更多關于C++實現(xiàn)RPC網(wǎng)絡通訊的資料請關注腳本之家其它相關文章!
相關文章
Windows安裝配置C/C++(VS2017)OpenSSL開發(fā)環(huán)境配置教程
這篇文章主要為大家詳細介紹了Windows安裝配置C/C++,OpenSSL開發(fā)環(huán)境配置教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-07-07
排列和組合算法的實現(xiàn)方法_C語言經(jīng)典案例
下面小編就為大家?guī)硪黄帕泻徒M合算法的實現(xiàn)方法_C語言經(jīng)典案例。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-09-09

