基于SpringBoot實(shí)現(xiàn)多線程多主機(jī)TCP通信
下面我將介紹如何使用 Spring Boot 實(shí)現(xiàn)多線程、多主機(jī)的 TCP 通信,包括發(fā)送數(shù)據(jù)和接收應(yīng)答并解析。
1. 項(xiàng)目結(jié)構(gòu)
src/main/java/com/example/tcpdemo/
├── config
│ └── TcpClientConfig.java
├── controller
│ └── TcpController.java
├── handler
│ ├── TcpClientHandler.java
│ └── TcpResponseHandler.java
├── model
│ └── TcpHost.java
├── service
│ ├── TcpClientService.java
│ └── impl
│ └── TcpClientServiceImpl.java
└── TcpDemoApplication.java
2. 核心代碼實(shí)現(xiàn)
2.1 配置類
// TcpClientConfig.java
@Configuration
public class TcpClientConfig {
@Value("${tcp.client.thread-pool-size:10}")
private int threadPoolSize;
@Bean
public ExecutorService tcpClientExecutor() {
return Executors.newFixedThreadPool(threadPoolSize);
}
}
2.2 TCP主機(jī)模型
// TcpHost.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TcpHost {
private String host;
private int port;
private String name; // 主機(jī)標(biāo)識(shí)名稱
}
2.3 TCP客戶端處理器
// TcpClientHandler.java
@Component
public class TcpClientHandler {
@Autowired
private ExecutorService tcpClientExecutor;
@Autowired
private TcpResponseHandler responseHandler;
public void sendToMultipleHosts(List<TcpHost> hosts, String message) {
hosts.forEach(host -> {
tcpClientExecutor.execute(() -> {
try (Socket socket = new Socket(host.getHost(), host.getPort());
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
// 發(fā)送數(shù)據(jù)
out.println(message);
System.out.println("Sent to " + host.getName() + ": " + message);
// 接收響應(yīng)
String response = in.readLine();
responseHandler.handleResponse(host, response);
} catch (IOException e) {
System.err.println("Error communicating with " + host.getName() + ": " + e.getMessage());
}
});
});
}
}
2.4 響應(yīng)處理器
// TcpResponseHandler.java
@Component
public class TcpResponseHandler {
public void handleResponse(TcpHost host, String response) {
// 這里實(shí)現(xiàn)你的響應(yīng)解析邏輯
System.out.println("Received from " + host.getName() + ": " + response);
// 示例解析邏輯
if (response != null) {
// 假設(shè)響應(yīng)格式為 "status|data"
String[] parts = response.split("\|");
if (parts.length == 2) {
String status = parts[0];
String data = parts[1];
System.out.println("Parsed response - Status: " + status + ", Data: " + data);
}
}
}
}
2.5 TCP服務(wù)接口
// TcpClientService.java
public interface TcpClientService {
void sendToHosts(List<TcpHost> hosts, String message);
}
2.6 TCP服務(wù)實(shí)現(xiàn)
// TcpClientServiceImpl.java
@Service
public class TcpClientServiceImpl implements TcpClientService {
@Autowired
private TcpClientHandler tcpClientHandler;
@Override
public void sendToHosts(List<TcpHost> hosts, String message) {
tcpClientHandler.sendToMultipleHosts(hosts, message);
}
}
2.7 控制器
// TcpController.java
@RestController
@RequestMapping("/api/tcp")
public class TcpController {
@Autowired
private TcpClientService tcpClientService;
@PostMapping("/send")
public ResponseEntity<String> sendMessage(@RequestBody Map<String, Object> request) {
List<Map<String, Object>> hostsInfo = (List<Map<String, Object>>) request.get("hosts");
String message = (String) request.get("message");
List<TcpHost> hosts = hostsInfo.stream()
.map(info -> new TcpHost(
(String) info.get("host"),
(Integer) info.get("port"),
(String) info.get("name")))
.collect(Collectors.toList());
tcpClientService.sendToHosts(hosts, message);
return ResponseEntity.ok("Messages sent successfully");
}
}
3. 應(yīng)用配置
在 application.properties或 application.yml中添加配置:
# TCP客戶端線程池大小 tcp.client.thread-pool-size=20 # 服務(wù)器端口 server.port=8080
4. 使用示例
4.1 啟動(dòng)Spring Boot應(yīng)用
// TcpDemoApplication.java
@SpringBootApplication
public class TcpDemoApplication {
public static void main(String[] args) {
SpringApplication.run(TcpDemoApplication.class, args);
}
}
4.2 發(fā)送請(qǐng)求
使用Postman或curl發(fā)送POST請(qǐng)求:
curl -X POST http://localhost:8080/api/tcp/send \
-H "Content-Type: application/json" \
-d '{
"hosts": [
{"host": "127.0.0.1", "port": 12345, "name": "Server1"},
{"host": "127.0.0.1", "port": 12346, "name": "Server2"}
],
"message": "Hello TCP Server"
}'
5. 高級(jí)功能擴(kuò)展
5.1 連接池管理
// 可以擴(kuò)展為使用連接池管理TCP連接
@Component
public class TcpConnectionPool {
private Map<String, Socket> connectionPool = new ConcurrentHashMap<>();
public Socket getConnection(TcpHost host) throws IOException {
String key = host.getHost() + ":" + host.getPort();
if (!connectionPool.containsKey(key) || connectionPool.get(key).isClosed()) {
connectionPool.put(key, new Socket(host.getHost(), host.getPort()));
}
return connectionPool.get(key);
}
public void closeAll() {
connectionPool.values().forEach(socket -> {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
});
connectionPool.clear();
}
}
5.2 超時(shí)設(shè)置
// 在TcpClientHandler中添加超時(shí)設(shè)置
public void sendToMultipleHosts(List<TcpHost> hosts, String message) {
hosts.forEach(host -> {
tcpClientExecutor.execute(() -> {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(host.getHost(), host.getPort()), 5000); // 5秒連接超時(shí)
socket.setSoTimeout(10000); // 10秒讀取超時(shí)
try (PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
out.println(message);
System.out.println("Sent to " + host.getName() + ": " + message);
String response = in.readLine();
responseHandler.handleResponse(host, response);
}
} catch (SocketTimeoutException e) {
System.err.println("Timeout when communicating with " + host.getName());
} catch (IOException e) {
System.err.println("Error communicating with " + host.getName() + ": " + e.getMessage());
}
});
});
}
5.3 自定義協(xié)議解析
// 擴(kuò)展TcpResponseHandler實(shí)現(xiàn)更復(fù)雜的協(xié)議解析
public void handleResponse(TcpHost host, String response) {
try {
// 示例:解析JSON格式響應(yīng)
JSONObject jsonResponse = new JSONObject(response);
String status = jsonResponse.getString("status");
String data = jsonResponse.getString("data");
long timestamp = jsonResponse.getLong("timestamp");
System.out.printf("Response from %s - Status: %s, Data: %s, Time: %tF %<tT%n",
host.getName(), status, data, new Date(timestamp));
} catch (JSONException e) {
System.err.println("Invalid response format from " + host.getName());
}
}
6. 注意事項(xiàng)
- ??線程安全??:確保在多線程環(huán)境下共享資源的線程安全
- ??資源釋放??:正確關(guān)閉Socket、流等資源
- ??異常處理??:合理處理各種網(wǎng)絡(luò)異常
- ??性能優(yōu)化??:根據(jù)實(shí)際需求調(diào)整線程池大小
- ??日志記錄??:添加詳細(xì)的日志記錄以便排查問(wèn)題
這個(gè)實(shí)現(xiàn)提供了基本的TCP多線程通信框架,你可以根據(jù)實(shí)際需求進(jìn)行擴(kuò)展和優(yōu)化。
到此這篇關(guān)于基于SpringBoot實(shí)現(xiàn)多線程多主機(jī)TCP通信的文章就介紹到這了,更多相關(guān)SpringBoot TCP通信內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Java8?CompletableFuture的并行處理用法
Java8中有一個(gè)工具非常有用,那就是CompletableFuture,本章主要講解CompletableFuture的并行處理用法,感興趣的小伙伴可以了解一下2022-04-04
淺談Mybatis+mysql 存儲(chǔ)Date類型的坑
這篇文章主要介紹了淺談Mybatis+mysql 存儲(chǔ)Date類型的坑,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-11-11
SpringRetry重試機(jī)制之@Retryable注解與重試策略詳解
本文將詳細(xì)介紹SpringRetry的重試機(jī)制,特別是@Retryable注解的使用及各種重試策略的配置,幫助開(kāi)發(fā)者構(gòu)建更加健壯的應(yīng)用程序,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-04-04
Spring-Security實(shí)現(xiàn)登錄接口流程
Security?是?Spring?家族中的一個(gè)安全管理框架,SpringSecurity的原理其實(shí)就是一個(gè)過(guò)濾器鏈,內(nèi)部包含了提供各種功能的過(guò)濾器,這篇文章主要介紹了Spring-Security實(shí)現(xiàn)登錄接口,需要的朋友可以參考下2023-05-05
Spring純注解開(kāi)發(fā)模式讓開(kāi)發(fā)簡(jiǎn)化更簡(jiǎn)化
Spring3.0引入了純注解開(kāi)發(fā)的模式,框架的誕生是為了簡(jiǎn)化開(kāi)發(fā),那注解開(kāi)發(fā)就是簡(jiǎn)化再簡(jiǎn)化。Spring的特性在整合MyBatis方面體現(xiàn)的淋漓盡致哦2022-08-08
java多線程累加計(jì)數(shù)的實(shí)現(xiàn)方法
在多線程協(xié)作任務(wù)中,如何計(jì)算也是很重的,這篇文章主要介紹了java多線程累加計(jì)數(shù)的實(shí)現(xiàn)方法,感興趣的朋友可以了解一下2021-05-05
解析Java編程中設(shè)計(jì)模式的開(kāi)閉原則的運(yùn)用
這篇文章主要介紹了解析Java編程中設(shè)計(jì)模式的開(kāi)閉原則的運(yùn)用,開(kāi)閉原則多應(yīng)用于Java程序的擴(kuò)展開(kāi)發(fā)方面,需要的朋友可以參考下2016-02-02
java多線程編程之使用thread類創(chuàng)建線程
在Java中創(chuàng)建線程有兩種方法:使用Thread類和使用Runnable接口。在使用Runnable接口時(shí)需要建立一個(gè)Thread實(shí)例2014-01-01

