使用C++手搓一個TCP連接管理器
背景
無所事事的小H想起了之前抄的一個web項目,現(xiàn)在看來自己完全沒有理解其中的內(nèi)容就匆匆寫上簡歷了,決定來補一下其中組件的實現(xiàn),第一個著手的就是TCP連接管理器。
在網(wǎng)絡(luò)編程中,管理多個 TCP 連接是一項常見而復(fù)雜的任務(wù),尤其是在構(gòu)建服務(wù)端或需要雙向通信的客戶端程序時。為了簡化這一過程,我們可以封裝一個通用的TCP連接管理器,它支持:
- 建立和監(jiān)聽連接
- 異步接收和發(fā)送數(shù)據(jù)
- 管理連接生命周期
- 錯誤處理與回調(diào)機制
本文將一步步拆解這個類的設(shè)計思路與實現(xiàn)方式,代碼已經(jīng)同步至github倉庫:futureseek/Cpp_learn: learn some model about C++ ,該倉庫還包含其他各種C++實現(xiàn)的小組件。
模塊設(shè)計概述
我們封裝的TcpConnectionManager類支持以下的核心功能:
- 服務(wù)端模式:監(jiān)聽端口、接受客戶端連接
- 客戶端模式:主動連接服務(wù)器
- 統(tǒng)一的連接信息結(jié)構(gòu)體
ConnectionInfo - 線程安全的連接管理容器
- 回調(diào)機制
此外,整個實現(xiàn)遵循現(xiàn)代 C++(C++11)風(fēng)格,使用 std::shared_ptr、std::function、std::thread、std::mutex 等標(biāo)準(zhǔn)庫特性,提升可讀性和安全性。
輔助類設(shè)計
這里提供了連接狀態(tài)還有連接信息作為我們TCP連接的信息封裝,包括套接字描述符、遠(yuǎn)程地址、端口、狀態(tài)、連接時間。
// 連接狀態(tài)枚舉
enum class ConnectionState {
DISCONNECTED,
CONNECTING,
CONNECTED,
DISCONNECTING,
ERROR
};
// 連接信息結(jié)構(gòu)體
struct ConnectionInfo {
int fd; // 套接字描述符
std::string remote_address;
uint16_t remote_port;
ConnectionState state;
time_t connection_time;
};核心功能實現(xiàn)
啟動服務(wù)端監(jiān)聽
使用 socket -> setsockopt -> bind -> listen -> accept 構(gòu)建服務(wù)器套接字,并在后臺線程中不斷接受連接。
監(jiān)聽線程通過 accept() 獲取新連接,并為每個連接分配一個新的 ConnectionInfo 對象,隨后啟動獨立線程處理該連接的數(shù)據(jù)接收。
// 啟動服務(wù)器
std::error_code start(uint16_t port){
/*
創(chuàng)建監(jiān)聽套接字
AF_INET: IPv4
SOCK_STREAM: TCP
0: 默認(rèn)協(xié)議,這里默認(rèn)選擇TCP協(xié)議,顯式指定: IPPROTO_TCP,IPPROTO_UDP
*/
listen_fd_ = socket(AF_INET,SOCK_STREAM,0);
if(listen_fd_ < 0){
return std::error_code(errno,std::generic_category());
}
// 設(shè)置SO_REUSEADDR選項,允許重用處于處于 TIME_WAIT 狀態(tài)的本地地址和端口
int opt = 1;
if(setsockopt(listen_fd_,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)) < 0){
close(listen_fd_);
return std::error_code(errno,std::generic_category());
}
// 綁定地址和端口
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY; // 監(jiān)聽所有可用接口
server_addr.sin_port = htons(port); // 轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序
if(bind(listen_fd_,(sockaddr*)&server_addr,sizeof(server_addr)) < 0){
close(listen_fd_);
return std::error_code(errno,std::generic_category());
}
// 開始監(jiān)聽
// SOMAXCONN: 系統(tǒng)允許的最大連接數(shù),linux系統(tǒng)下通常為128
if(listen(listen_fd_,SOMAXCONN)<0){
close(listen_fd_);
return std::error_code(errno,std::generic_category());
}
running_ = true;
std::thread([this](){
this->acceptConnections();
}).detach();
return std::error_code(); // 成功返回默認(rèn)構(gòu)造的錯誤碼,表示無錯誤
}
void acceptConnections(){
while(running_){
sockaddr_in client_addr{};
socklen_t client_len = sizeof(client_addr);
/*
accept 函數(shù)的第二個參數(shù) &client_addr:是一個輸出參數(shù),內(nèi)核會將客戶端的地址信息(IP、端口、地址族)填充到這個結(jié)構(gòu)體中。
*/
int client_fd = accept(listen_fd_,(struct sockaddr*)&client_addr,&client_len);
if(client_fd < 0){
if(running_){
std::error_code ec(errno,std::system_category());
handleError(-1,ec);
}
continue;
}
char ip_str[INET_ADDRSTRLEN];
inet_ntop(AF_INET,&client_addr.sin_addr,ip_str,INET_ADDRSTRLEN);
auto info = std::make_shared<ConnectionInfo>();
info->fd = client_fd;
info->remote_address = ip_str;
info->remote_port = ntohs(client_addr.sin_port);
info->state = ConnectionState::CONNECTED;
info->connection_time = time(nullptr);
{
std::lock_guard<std::mutex> lock(connections_mutex_);
connections_[client_fd] = info;
}
if(connection_callback_){
connection_callback_(*info);
}
std::thread([this,client_fd](){
this->receiveData(client_fd);
}).detach();
}
}客戶端主動連接
通過 connect() 系統(tǒng)調(diào)用實現(xiàn)主動連接服務(wù)器,同樣在連接成功后開啟數(shù)據(jù)接收線程。
// 主動建立連接
std::error_code connect(const std::string& host,uint16_t port,std::shared_ptr<ConnectionInfo>& conn_info){
running_ = true;
int fd =socket(AF_INET,SOCK_STREAM,0);
if(fd<0){
return std::error_code(errno,std::generic_category());
}
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
// 將點分十進(jìn)制字符串轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序的二進(jìn)制形式
if(inet_pton(AF_INET,host.c_str(),&server_addr.sin_addr)<=0){
close(fd);
return std::error_code(errno,std::generic_category());
}
auto info = std::make_shared<ConnectionInfo>();
info->fd = fd;
info->remote_address = host;
info->remote_port = port;
info->state = ConnectionState::CONNECTING;
info->connection_time = time(nullptr);
{
std::lock_guard<std::mutex> lock(connections_mutex_);
connections_[fd] = info;
}
// ::conect 避免與成員函數(shù)重名
if(::connect(fd,(sockaddr*)&server_addr,sizeof(server_addr))<0){
close(fd);
return std::error_code(errno,std::generic_category());
}
info->state = ConnectionState::CONNECTED;
if(connection_callback_){
connection_callback_(*info);
}
std::thread([this,fd](){
this->receiveData(fd);
}).detach();
conn_info = info;
return std::error_code();
}數(shù)據(jù)發(fā)送
封裝 send() 調(diào)用,帶有 MSG_NOSIGNAL 防止崩潰,支持向指定連接發(fā)送任意二進(jìn)制數(shù)據(jù)。
// 發(fā)送數(shù)據(jù)
std::error_code sendData(int fd, const std::vector<uint8_t>& data) {
size_t total_sent = 0;
while (total_sent < data.size()) {
ssize_t sent = ::send(fd,
data.data() + total_sent,
data.size() - total_sent,
MSG_NOSIGNAL);
if (sent < 0) {
if (errno == EINTR) {
continue; // 被信號打斷,重試
}
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 對端接收緩沖區(qū)滿了,可以選擇 epoll 等待再寫
// 簡單實現(xiàn):直接返回錯誤
return std::error_code(errno, std::generic_category());
}
return std::error_code(errno, std::generic_category());
}
total_sent += static_cast<size_t>(sent);
}
return {}; // success
}異步數(shù)據(jù)接收
每個連接啟動一個獨立線程,阻塞式調(diào)用 recv() 接收數(shù)據(jù),接收到后調(diào)用數(shù)據(jù)回調(diào)函數(shù)。
處理邏輯包括:
- 正常讀取數(shù)據(jù)并回調(diào)
- 客戶端關(guān)閉連接
- 出現(xiàn)網(wǎng)絡(luò)錯誤時關(guān)閉連接并回調(diào)錯誤處理
// 接受數(shù)據(jù)
void receiveData(int fd){
std::shared_ptr<ConnectionInfo> conn;
{
std::lock_guard<std::mutex> lock(connections_mutex_);
auto it = connections_.find(fd);
if(it == connections_.end()){
return;
}
conn = it->second;
}
std::vector<uint8_t> buffer(4096);
while(running_){
ssize_t bytes_received = recv(fd,buffer.data(),buffer.size(),0);
if(bytes_received > 0){
std::vector<uint8_t> data(buffer.begin(),buffer.begin()+bytes_received);
if(data_callback_){
data_callback_(*conn,data);
}
}
else if(bytes_received == 0){
closeConnection(fd);
break;
}
else{
if(errno != EAGAIN && errno!= EWOULDBLOCK){
std::error_code ec(errno,std::system_category());
handleError(fd,ec);
closeConnection(fd);
break;
}
}
}
}錯誤處理
所有系統(tǒng)調(diào)用失敗都封裝為 std::error_code,統(tǒng)一交由 handleError 方法處理。
// 錯誤處理
void handleError(int fd,const std::error_code& ec){
if(error_callback_){
if(fd == -1){
std::lock_guard<std::mutex> lock(connections_mutex_);
auto it = connections_.find(fd);
if(it != connections_.end()){
it->second->state = ConnectionState::ERROR;
error_callback_(*(it->second),ec);
}
}
else{
ConnectionInfo info;
info.state = ConnectionState::ERROR;
error_callback_(info,ec);
}
}
}測試
再根據(jù)功能給出測試,流程如下:
1.服務(wù)器端啟動
- 設(shè)置 連接回調(diào)(
setConnectionCallback):用來檢測客戶端連接和斷開。 - 設(shè)置 數(shù)據(jù)回調(diào)(
setDataCallback):收到數(shù)據(jù)后,反轉(zhuǎn)數(shù)據(jù)并回傳(echo)。 - 啟動監(jiān)聽端口
12345。
2.客戶端啟動
- 設(shè)置 連接回調(diào):檢測和服務(wù)器的連接狀態(tài)。
- 設(shè)置 數(shù)據(jù)回調(diào):打印接收到的服務(wù)器回傳數(shù)據(jù)。
- 連接到服務(wù)器
127.0.0.1:12345。
3.數(shù)據(jù)傳輸測試
- 客戶端向服務(wù)器發(fā)送
"Hello World!"。 - 服務(wù)器收到后 反轉(zhuǎn)字符串 →
"!dlroW olleH",然后回傳給客戶端。 - 客戶端收到后打印回傳內(nèi)容。
4.連接關(guān)閉測試
- 客戶端主動關(guān)閉連接。
- 確認(rèn)客戶端和服務(wù)器都檢測到連接關(guān)閉,且服務(wù)器的連接計數(shù)為
0。
時序圖如下:

代碼
namespace TcpConnectionManager_Test {
std::mutex mutex;
std::condition_variable cv;
void test() {
TcpConnectionManager server;
TcpConnectionManager client;
// 同步工具
bool server_ready = false;
bool server_connected = false;
bool client_connected = false;
bool server_received = false; // 服務(wù)器收到(僅服務(wù)器設(shè)置)
bool client_received = false; // 客戶端收到回顯(僅客戶端設(shè)置)
std::string client_echo_str; // 保存客戶端接收到的回顯,便于斷言
// 1. 服務(wù)器初始化
server.setConnectionCallback([&](const TcpConnectionManager::ConnectionInfo& conn) {
std::lock_guard<std::mutex> lock(mutex);
if (conn.state == TcpConnectionManager::ConnectionState::CONNECTED) {
std::cout << "[Server] Connection from "
<< conn.remote_address << ":" << conn.remote_port
<< " established\n";
server_connected = true;
} else if (conn.state == TcpConnectionManager::ConnectionState::DISCONNECTED) {
std::cout << "[Server] Connection closed\n";
server_connected = false;
}
cv.notify_all();
});
server.setDataCallback([&](const TcpConnectionManager::ConnectionInfo& conn,
const std::vector<uint8_t>& data) {
std::lock_guard<std::mutex> lock(mutex);
std::cout << "[Server] Received " << data.size() << " bytes\n";
std::vector<uint8_t> echo(data.rbegin(), data.rend());
std::string echo_str(echo.begin(), echo.end());
std::cout << "[Server] Echo payload: " << echo_str << "\n";
if (auto ec = server.sendData(conn.fd, echo)) {
std::cout << "[Server] Failed to send echo: " << ec.message() << "\n";
} else {
std::cout << "[Server] Echo sent successfully\n";
}
server_received = true;
cv.notify_all();
});
// 啟動服務(wù)器
if (auto ec = server.start(12345)) {
std::cerr << "Server failed to start: " << ec.message() << "\n";
return;
}
{
std::lock_guard<std::mutex> lock(mutex);
server_ready = true;
std::cout << "[Server] Ready on port 12345\n";
cv.notify_all();
}
// 2. 客戶端初始化
std::shared_ptr<TcpConnectionManager::ConnectionInfo> client_conn;
client.setConnectionCallback([&](const TcpConnectionManager::ConnectionInfo& conn) {
std::cout << "[Client] Connection state changed to: ";
switch(conn.state) {
case TcpConnectionManager::ConnectionState::CONNECTED:
std::cout << "CONNECTED"; break;
case TcpConnectionManager::ConnectionState::DISCONNECTING:
std::cout << "DISCONNECTING"; break;
case TcpConnectionManager::ConnectionState::DISCONNECTED:
std::cout << "DISCONNECTED"; break;
case TcpConnectionManager::ConnectionState::ERROR:
std::cout << "ERROR"; break;
default:
std::cout << static_cast<int>(conn.state);
}
std::cout << "\n";
std::lock_guard<std::mutex> lock(mutex);
if (conn.state == TcpConnectionManager::ConnectionState::CONNECTED) {
std::cout << "[Client] Connected to server\n";
client_connected = true;
} else {
std::cout << "[Client] Connection state changed: "
<< static_cast<int>(conn.state) << "\n";
client_connected = false;
}
cv.notify_all();
});
client.setDataCallback([&](const TcpConnectionManager::ConnectionInfo& conn,
const std::vector<uint8_t>& data) {
std::lock_guard<std::mutex> lock(mutex);
std::string received(data.begin(), data.end());
std::cout << "[Client] Received echo: " << received << "\n";
std::cout << "[Debug-Client] receiveData() start for fd=" << conn.fd << std::endl;
std::cout << "[Debug-Client] recv() got " << data.size() << " bytes" << std::endl;
client_received = true;
cv.notify_all();
});
// 等待服務(wù)器啟動
{
std::unique_lock<std::mutex> lock(mutex);
cv.wait_for(lock, std::chrono::seconds(3), [&]{ return server_ready; });
if (!server_ready) {
std::cerr << "Timeout waiting for server to start\n";
return;
}
}
// 客戶端連接
if (auto ec = client.connect("127.0.0.1", 12345, client_conn)) {
std::cerr << "Client failed to connect: " << ec.message() << "\n";
return;
}
// 等待連接建立
{
std::unique_lock<std::mutex> lock(mutex);
if (!cv.wait_for(lock, std::chrono::seconds(3), [&]{
return server_connected && client_connected;
})) {
std::cerr << "Timeout waiting for connection to establish\n";
return;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 確保穩(wěn)定
if (server.getConnectionCount() != 1) {
std::cerr << "Connection count mismatch. Expected 1, got: "
<< server.getConnectionCount() << "\n";
return;
}
// 3. 測試數(shù)據(jù)傳輸
std::string test_message = "Hello World!";
std::vector<uint8_t> test_data(test_message.begin(), test_message.end());
if (auto ec = client.sendData(client_conn->fd, test_data)) {
std::cerr << "Failed to send data: " << ec.message() << "\n";
return;
}
std::cout << "[Client] Data sent\n";
// 等待數(shù)據(jù)回傳
{
std::unique_lock<std::mutex> lock(mutex);
if (!cv.wait_for(lock, std::chrono::seconds(3), [&]{ return client_received; })) {
std::cerr << "Timeout waiting for client to receive echo\n";
return;
}
}
// 4. 測試連接關(guān)閉
try {
client.closeConnection(client_conn->fd);
} catch (const std::exception& e) {
std::cerr << "Failed to close connection: " << e.what() << "\n";
return;
}
// 等待連接關(guān)閉
{
std::unique_lock<std::mutex> lock(mutex);
if (!cv.wait_for(lock, std::chrono::seconds(3), [&]{
return !server_connected && !client_connected;
})) {
std::cerr << "Timeout waiting for disconnection\n";
return;
}
}
if (server.getConnectionCount() != 0) {
std::cerr << "Expected 0 connections after close, got: "
<< server.getConnectionCount() << "\n";
return;
}
std::cout << "All TCP connection manager tests passed successfully!\n";
}
};
完整代碼
#ifndef CPP_LEARN_TCPCONNECTIONMANAGER_H
#define CPP_LEARN_TCPCONNECTIONMANAGER_H
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <thread>
#include <unistd.h>
#include <string>
#include <functional>
#include <unordered_map>
#include <memory>
#include <vector>
#include <mutex>
#include <system_error>
#include <condition_variable>
#include <assert.h>
class TcpConnectionManager {
public:
// 連接狀態(tài)枚舉
enum class ConnectionState {
DISCONNECTED,
CONNECTING,
CONNECTED,
DISCONNECTING,
ERROR
};
// 連接信息結(jié)構(gòu)體
struct ConnectionInfo {
int fd; // 套接字描述符
std::string remote_address;
uint16_t remote_port;
ConnectionState state;
time_t connection_time;
};
using ConnectionCallback = std::function<void(const ConnectionInfo&)>;
using DataCallback = std::function<void(const ConnectionInfo&,const std::vector<uint8_t>&)>;
using ErrorCallback = std::function<void(const ConnectionInfo&,const std::error_code&)>;
private:
std::unordered_map<int,std::shared_ptr<ConnectionInfo>> connections_;
ConnectionCallback connection_callback_;
DataCallback data_callback_;
ErrorCallback error_callback_;
int listen_fd_ = -1;
bool running_ = false;
public:
TcpConnectionManager() = default;
~TcpConnectionManager(){
stop();
}
void setConnectionCallback(ConnectionCallback cb){
connection_callback_=cb;
}
void setDataCallback(DataCallback db){
data_callback_ = db;
}
void setErrorCallback(ErrorCallback eb){
error_callback_ = eb;
}
// 啟動服務(wù)器
std::error_code start(uint16_t port){
/*
創(chuàng)建監(jiān)聽套接字
AF_INET: IPv4
SOCK_STREAM: TCP
0: 默認(rèn)協(xié)議,這里默認(rèn)選擇TCP協(xié)議,顯式指定: IPPROTO_TCP,IPPROTO_UDP
*/
listen_fd_ = socket(AF_INET,SOCK_STREAM,0);
if(listen_fd_ < 0){
return std::error_code(errno,std::generic_category());
}
// 設(shè)置SO_REUSEADDR選項,允許重用處于處于 TIME_WAIT 狀態(tài)的本地地址和端口
int opt = 1;
if(setsockopt(listen_fd_,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)) < 0){
close(listen_fd_);
return std::error_code(errno,std::generic_category());
}
// 綁定地址和端口
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY; // 監(jiān)聽所有可用接口
server_addr.sin_port = htons(port); // 轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序
if(bind(listen_fd_,(sockaddr*)&server_addr,sizeof(server_addr)) < 0){
close(listen_fd_);
return std::error_code(errno,std::generic_category());
}
// 開始監(jiān)聽
// SOMAXCONN: 系統(tǒng)允許的最大連接數(shù),linux系統(tǒng)下通常為128
if(listen(listen_fd_,SOMAXCONN)<0){
close(listen_fd_);
return std::error_code(errno,std::generic_category());
}
running_ = true;
std::thread([this](){
this->acceptConnections();
}).detach();
return std::error_code(); // 成功返回默認(rèn)構(gòu)造的錯誤碼,表示無錯誤
}
// 停止服務(wù)器
void stop(){
running_ = false;
if(listen_fd_ != -1){
close(listen_fd_);
listen_fd_ = -1;
}
// 關(guān)閉所有連接
std::lock_guard<std::mutex> lock(connections_mutex_);
for(const auto& pair : connections_){
closeConnection(pair.first);
}
connections_.clear();
}
// 主動建立連接
std::error_code connect(const std::string& host,uint16_t port,std::shared_ptr<ConnectionInfo>& conn_info){
running_ = true;
int fd =socket(AF_INET,SOCK_STREAM,0);
if(fd<0){
return std::error_code(errno,std::generic_category());
}
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
// 將點分十進(jìn)制字符串轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序的二進(jìn)制形式
if(inet_pton(AF_INET,host.c_str(),&server_addr.sin_addr)<=0){
close(fd);
return std::error_code(errno,std::generic_category());
}
auto info = std::make_shared<ConnectionInfo>();
info->fd = fd;
info->remote_address = host;
info->remote_port = port;
info->state = ConnectionState::CONNECTING;
info->connection_time = time(nullptr);
{
std::lock_guard<std::mutex> lock(connections_mutex_);
connections_[fd] = info;
}
// ::conect 避免與成員函數(shù)重名
if(::connect(fd,(sockaddr*)&server_addr,sizeof(server_addr))<0){
close(fd);
return std::error_code(errno,std::generic_category());
}
info->state = ConnectionState::CONNECTED;
if(connection_callback_){
connection_callback_(*info);
}
std::thread([this,fd](){
this->receiveData(fd);
}).detach();
conn_info = info;
return std::error_code();
}
// 發(fā)送數(shù)據(jù)
std::error_code sendData(int fd, const std::vector<uint8_t>& data) {
size_t total_sent = 0;
while (total_sent < data.size()) {
ssize_t sent = ::send(fd,
data.data() + total_sent,
data.size() - total_sent,
MSG_NOSIGNAL);
if (sent < 0) {
if (errno == EINTR) {
continue; // 被信號打斷,重試
}
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 對端接收緩沖區(qū)滿了,可以選擇 epoll 等待再寫
// 簡單實現(xiàn):直接返回錯誤
return std::error_code(errno, std::generic_category());
}
return std::error_code(errno, std::generic_category());
}
total_sent += static_cast<size_t>(sent);
}
return {}; // success
}
void closeConnection(int fd) {
if (fd < 0) return;
// 半關(guān)閉寫端,告訴對方不再發(fā)數(shù)據(jù)
::shutdown(fd, SHUT_WR);
// 直接關(guān)閉,不阻塞讀
::close(fd);
{
std::lock_guard<std::mutex> lock(connections_mutex_);
connections_.erase(fd);
}
// 通知回調(diào)
if (connection_callback_) {
TcpConnectionManager::ConnectionInfo info;
info.fd = fd;
info.state = TcpConnectionManager::ConnectionState::DISCONNECTED;
connection_callback_(info);
}
}
// 獲取活動連接數(shù)
size_t getConnectionCount() const{
std::lock_guard<std::mutex> lock(connections_mutex_);
return connections_.size();
}
private:
mutable std::mutex connections_mutex_;
// 接受新連接
void acceptConnections(){
while(running_){
sockaddr_in client_addr{};
socklen_t client_len = sizeof(client_addr);
/*
accept 函數(shù)的第二個參數(shù) &client_addr:是一個輸出參數(shù),內(nèi)核會將客戶端的地址信息(IP、端口、地址族)填充到這個結(jié)構(gòu)體中。
*/
int client_fd = accept(listen_fd_,(struct sockaddr*)&client_addr,&client_len);
if(client_fd < 0){
if(running_){
std::error_code ec(errno,std::system_category());
handleError(-1,ec);
}
continue;
}
char ip_str[INET_ADDRSTRLEN];
inet_ntop(AF_INET,&client_addr.sin_addr,ip_str,INET_ADDRSTRLEN);
auto info = std::make_shared<ConnectionInfo>();
info->fd = client_fd;
info->remote_address = ip_str;
info->remote_port = ntohs(client_addr.sin_port);
info->state = ConnectionState::CONNECTED;
info->connection_time = time(nullptr);
{
std::lock_guard<std::mutex> lock(connections_mutex_);
connections_[client_fd] = info;
}
if(connection_callback_){
connection_callback_(*info);
}
std::thread([this,client_fd](){
this->receiveData(client_fd);
}).detach();
}
}
// 接受數(shù)據(jù)
void receiveData(int fd){
std::shared_ptr<ConnectionInfo> conn;
{
std::lock_guard<std::mutex> lock(connections_mutex_);
auto it = connections_.find(fd);
if(it == connections_.end()){
return;
}
conn = it->second;
}
std::vector<uint8_t> buffer(4096);
while(running_){
ssize_t bytes_received = recv(fd,buffer.data(),buffer.size(),0);
if(bytes_received > 0){
std::vector<uint8_t> data(buffer.begin(),buffer.begin()+bytes_received);
if(data_callback_){
data_callback_(*conn,data);
}
}
else if(bytes_received == 0){
closeConnection(fd);
break;
}
else{
if(errno != EAGAIN && errno!= EWOULDBLOCK){
std::error_code ec(errno,std::system_category());
handleError(fd,ec);
closeConnection(fd);
break;
}
}
}
}
// 錯誤處理
void handleError(int fd,const std::error_code& ec){
if(error_callback_){
if(fd == -1){
std::lock_guard<std::mutex> lock(connections_mutex_);
auto it = connections_.find(fd);
if(it != connections_.end()){
it->second->state = ConnectionState::ERROR;
error_callback_(*(it->second),ec);
}
}
else{
ConnectionInfo info;
info.state = ConnectionState::ERROR;
error_callback_(info,ec);
}
}
}
};
namespace TcpConnectionManager_Test {
std::mutex mutex;
std::condition_variable cv;
void test() {
TcpConnectionManager server;
TcpConnectionManager client;
// 同步工具
bool server_ready = false;
bool server_connected = false;
bool client_connected = false;
bool server_received = false; // 服務(wù)器收到(僅服務(wù)器設(shè)置)
bool client_received = false; // 客戶端收到回顯(僅客戶端設(shè)置)
std::string client_echo_str; // 保存客戶端接收到的回顯,便于斷言
// 1. 服務(wù)器初始化
server.setConnectionCallback([&](const TcpConnectionManager::ConnectionInfo& conn) {
std::lock_guard<std::mutex> lock(mutex);
if (conn.state == TcpConnectionManager::ConnectionState::CONNECTED) {
std::cout << "[Server] Connection from "
<< conn.remote_address << ":" << conn.remote_port
<< " established\n";
server_connected = true;
} else if (conn.state == TcpConnectionManager::ConnectionState::DISCONNECTED) {
std::cout << "[Server] Connection closed\n";
server_connected = false;
}
cv.notify_all();
});
server.setDataCallback([&](const TcpConnectionManager::ConnectionInfo& conn,
const std::vector<uint8_t>& data) {
std::lock_guard<std::mutex> lock(mutex);
std::cout << "[Server] Received " << data.size() << " bytes\n";
std::vector<uint8_t> echo(data.rbegin(), data.rend());
std::string echo_str(echo.begin(), echo.end());
std::cout << "[Server] Echo payload: " << echo_str << "\n";
if (auto ec = server.sendData(conn.fd, echo)) {
std::cout << "[Server] Failed to send echo: " << ec.message() << "\n";
} else {
std::cout << "[Server] Echo sent successfully\n";
}
server_received = true;
cv.notify_all();
});
// 啟動服務(wù)器
if (auto ec = server.start(12345)) {
std::cerr << "Server failed to start: " << ec.message() << "\n";
return;
}
{
std::lock_guard<std::mutex> lock(mutex);
server_ready = true;
std::cout << "[Server] Ready on port 12345\n";
cv.notify_all();
}
// 2. 客戶端初始化
std::shared_ptr<TcpConnectionManager::ConnectionInfo> client_conn;
client.setConnectionCallback([&](const TcpConnectionManager::ConnectionInfo& conn) {
std::cout << "[Client] Connection state changed to: ";
switch(conn.state) {
case TcpConnectionManager::ConnectionState::CONNECTED:
std::cout << "CONNECTED"; break;
case TcpConnectionManager::ConnectionState::DISCONNECTING:
std::cout << "DISCONNECTING"; break;
case TcpConnectionManager::ConnectionState::DISCONNECTED:
std::cout << "DISCONNECTED"; break;
case TcpConnectionManager::ConnectionState::ERROR:
std::cout << "ERROR"; break;
default:
std::cout << static_cast<int>(conn.state);
}
std::cout << "\n";
std::lock_guard<std::mutex> lock(mutex);
if (conn.state == TcpConnectionManager::ConnectionState::CONNECTED) {
std::cout << "[Client] Connected to server\n";
client_connected = true;
} else {
std::cout << "[Client] Connection state changed: "
<< static_cast<int>(conn.state) << "\n";
client_connected = false;
}
cv.notify_all();
});
client.setDataCallback([&](const TcpConnectionManager::ConnectionInfo& conn,
const std::vector<uint8_t>& data) {
std::lock_guard<std::mutex> lock(mutex);
std::string received(data.begin(), data.end());
std::cout << "[Client] Received echo: " << received << "\n";
std::cout << "[Debug-Client] receiveData() start for fd=" << conn.fd << std::endl;
std::cout << "[Debug-Client] recv() got " << data.size() << " bytes" << std::endl;
client_received = true;
cv.notify_all();
});
// 等待服務(wù)器啟動
{
std::unique_lock<std::mutex> lock(mutex);
cv.wait_for(lock, std::chrono::seconds(3), [&]{ return server_ready; });
if (!server_ready) {
std::cerr << "Timeout waiting for server to start\n";
return;
}
}
// 客戶端連接
if (auto ec = client.connect("127.0.0.1", 12345, client_conn)) {
std::cerr << "Client failed to connect: " << ec.message() << "\n";
return;
}
// 等待連接建立
{
std::unique_lock<std::mutex> lock(mutex);
if (!cv.wait_for(lock, std::chrono::seconds(3), [&]{
return server_connected && client_connected;
})) {
std::cerr << "Timeout waiting for connection to establish\n";
return;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 確保穩(wěn)定
if (server.getConnectionCount() != 1) {
std::cerr << "Connection count mismatch. Expected 1, got: "
<< server.getConnectionCount() << "\n";
return;
}
// 3. 測試數(shù)據(jù)傳輸
std::string test_message = "Hello World!";
std::vector<uint8_t> test_data(test_message.begin(), test_message.end());
if (auto ec = client.sendData(client_conn->fd, test_data)) {
std::cerr << "Failed to send data: " << ec.message() << "\n";
return;
}
std::cout << "[Client] Data sent\n";
// 等待數(shù)據(jù)回傳
{
std::unique_lock<std::mutex> lock(mutex);
if (!cv.wait_for(lock, std::chrono::seconds(3), [&]{ return client_received; })) {
std::cerr << "Timeout waiting for client to receive echo\n";
return;
}
}
// 4. 測試連接關(guān)閉
try {
client.closeConnection(client_conn->fd);
} catch (const std::exception& e) {
std::cerr << "Failed to close connection: " << e.what() << "\n";
return;
}
// 等待連接關(guān)閉
{
std::unique_lock<std::mutex> lock(mutex);
if (!cv.wait_for(lock, std::chrono::seconds(3), [&]{
return !server_connected && !client_connected;
})) {
std::cerr << "Timeout waiting for disconnection\n";
return;
}
}
if (server.getConnectionCount() != 0) {
std::cerr << "Expected 0 connections after close, got: "
<< server.getConnectionCount() << "\n";
return;
}
std::cout << "All TCP connection manager tests passed successfully!\n";
}
};
#endif //CPP_LEARN_TCPCONNECTIONMANAGER_H結(jié)語
本次調(diào)試還是遇到了各種奇怪的問題,寫起來的時候感覺還好,真到了測試的時候又出現(xiàn)各種順序上的問題,尤其要注意各種資源釋放順序是否會引發(fā)互斥或者提前釋放了。
到此這篇關(guān)于使用C++手搓一個TCP連接管理器的文章就介紹到這了,更多相關(guān)C++ TCP連接管理器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C++實現(xiàn)CreatThread函數(shù)主線程與工作線程交互的方法
這篇文章主要介紹了C++實現(xiàn)CreatThread函數(shù)主線程與工作線程交互的方法,是Windows應(yīng)用程序設(shè)計中非常實用的方法,需要的朋友可以參考下2014-10-10
C++實現(xiàn)LeetCode(126.詞語階梯之二)
這篇文章主要介紹了C++實現(xiàn)LeetCode(126.詞語階梯之二),本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-07-07
C語言實現(xiàn)飛機大戰(zhàn)程序設(shè)計
這篇文章主要為大家詳細(xì)介紹了C語言實現(xiàn)飛機大戰(zhàn),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-06-06

