Spring?Boot?中實現(xiàn)?WebSocket?集群思路詳解
在 Spring Boot 中實現(xiàn) WebSocket 集群處理,核心挑戰(zhàn)是解決多節(jié)點間的 WebSocket 會話共享和消息同步問題(因為單節(jié)點 WebSocket 的會話是內(nèi)存存儲的,多節(jié)點間無法直接通信)。以下是完整的實現(xiàn)方案:
一、核心思路
- 會話共享:使用分布式存儲(如 Redis)存儲所有節(jié)點的 WebSocket 會話信息(會話 ID、用戶標(biāo)識、節(jié)點標(biāo)識等)。
- 消息轉(zhuǎn)發(fā):當(dāng)某節(jié)點需要向用戶發(fā)送消息時,先通過 Redis 判斷用戶連接在哪個節(jié)點,再通過消息隊列(如 Redis Pub/Sub)將消息轉(zhuǎn)發(fā)到目標(biāo)節(jié)點,由目標(biāo)節(jié)點推送消息給用戶。
- 集群感知:每個節(jié)點啟動時注冊自身信息,下線時注銷,確保消息能正確路由。
二、技術(shù)選型
- WebSocket 框架:Spring WebSocket(基于 JSR-356 標(biāo)準(zhǔn))
- 分布式存儲:Redis(存儲會話映射、節(jié)點信息)
- 消息隊列:Redis Pub/Sub(節(jié)點間消息轉(zhuǎn)發(fā))
- 依賴管理:Spring Boot Starter WebSocket + Spring Data Redis
三、實現(xiàn)步驟
1. 引入依賴
在pom.xml中添加以下依賴:
<!-- Spring WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Redis (用于會話存儲和消息轉(zhuǎn)發(fā)) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 連接池 (可選,優(yōu)化Redis性能) -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>2. 配置 Redis
在application.yml中配置 Redis 連接:
spring:
redis:
host: localhost # Redis服務(wù)器地址
port: 6379 # Redis端口
password: # 密碼(如有)
lettuce:
pool:
max-active: 8 # 最大連接數(shù)
max-idle: 8 # 最大空閑連接
min-idle: 2 # 最小空閑連接3. WebSocket 核心配置
實現(xiàn) WebSocket 配置類,注冊端點和消息處理器:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.HandshakeInterceptor;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final WebSocketHandler webSocketHandler;
private final HandshakeInterceptor handshakeInterceptor;
// 注入自定義處理器和攔截器
public WebSocketConfig(WebSocketHandler webSocketHandler, HandshakeInterceptor handshakeInterceptor) {
this.webSocketHandler = webSocketHandler;
this.handshakeInterceptor = handshakeInterceptor;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 注冊WebSocket端點,允許跨域
registry.addHandler(webSocketHandler, "/ws")
.addInterceptors(handshakeInterceptor)
.setAllowedOrigins("*"); // 生產(chǎn)環(huán)境需指定具體域名
}
}4. 會話管理與集群同步
需要解決 3 個核心問題:會話注冊、消息路由、節(jié)點間消息轉(zhuǎn)發(fā)。
4.1 定義常量(Redis Key)
public class RedisKeyConstants {
// 存儲用戶-節(jié)點映射:key=userId,value=nodeId
public static final String USER_NODE_MAP = "websocket:user:node";
// 存儲節(jié)點信息:key=nodeId,value=節(jié)點信息(如IP:端口)
public static final String NODE_INFO = "websocket:node:info";
// Redis Pub/Sub頻道(用于節(jié)點間消息轉(zhuǎn)發(fā))
public static final String MSG_CHANNEL = "websocket:msg:channel";
}4.2 握手攔截器(記錄會話與用戶映射)
在連接建立時,將用戶 ID 與當(dāng)前節(jié)點 ID 關(guān)聯(lián)并存儲到 Redis:
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.data.redis.core.StringRedisTemplate;
import javax.servlet.http.HttpSession;
import java.util.Map;
import java.util.UUID;
@Component
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
private final StringRedisTemplate redisTemplate;
private final String nodeId; // 當(dāng)前節(jié)點唯一標(biāo)識(如UUID)
public WebSocketHandshakeInterceptor(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
this.nodeId = UUID.randomUUID().toString(); // 節(jié)點啟動時生成唯一ID
}
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
// 從請求參數(shù)或Session中獲取用戶ID(根據(jù)業(yè)務(wù)場景調(diào)整)
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpSession session = servletRequest.getServletRequest().getSession();
String userId = (String) session.getAttribute("userId"); // 假設(shè)用戶已登錄,Session中存userId
if (userId != null) {
// 將用戶與當(dāng)前節(jié)點關(guān)聯(lián)(存儲到Redis)
redisTemplate.opsForValue().set(
RedisKeyConstants.USER_NODE_MAP + ":" + userId,
nodeId
);
// 注冊當(dāng)前節(jié)點信息(可選,用于監(jiān)控)
redisTemplate.opsForValue().set(
RedisKeyConstants.NODE_INFO + ":" + nodeId,
request.getLocalAddress().toString() // 節(jié)點地址
);
attributes.put("userId", userId); // 傳遞userId到處理器
return true;
}
return false; // 未登錄用戶拒絕連接
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {}
}4.3 WebSocket 消息處理器
處理消息接收,并通過 Redis Pub/Sub 轉(zhuǎn)發(fā)跨節(jié)點消息:
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class WebSocketHandler extends TextWebSocketHandler {
// 本地會話緩存(當(dāng)前節(jié)點的WebSocket會話)
private final Map<String, WebSocketSession> localSessions = new ConcurrentHashMap<>();
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
private final String nodeId; // 當(dāng)前節(jié)點ID(與攔截器一致)
public WebSocketHandler(StringRedisTemplate redisTemplate, ObjectMapper objectMapper,
@Value("${websocket.node.id}") String nodeId) {
this.redisTemplate = redisTemplate;
this.objectMapper = objectMapper;
this.nodeId = nodeId;
// 訂閱Redis頻道,接收其他節(jié)點的消息
subscribeToRedisChannel();
}
// 連接建立時,緩存本地會話
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String userId = (String) session.getAttributes().get("userId");
localSessions.put(userId, session);
}
// 接收客戶端消息
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String userId = (String) session.getAttributes().get("userId");
String payload = message.getPayload(); // 客戶端發(fā)送的消息
// 解析消息(假設(shè)消息格式:{"targetUserId": "xxx", "content": "xxx"})
Map<String, String> msg = objectMapper.readValue(payload, Map.class);
String targetUserId = msg.get("targetUserId");
String content = msg.get("content");
// 發(fā)送消息給目標(biāo)用戶(跨節(jié)點則轉(zhuǎn)發(fā))
sendMessageToUser(targetUserId, content);
}
// 發(fā)送消息給指定用戶
public void sendMessageToUser(String targetUserId, String content) throws IOException {
// 1. 從Redis獲取目標(biāo)用戶所在節(jié)點
String targetNodeId = redisTemplate.opsForValue().get(
RedisKeyConstants.USER_NODE_MAP + ":" + targetUserId
);
if (targetNodeId == null) {
throw new RuntimeException("用戶未連接");
}
// 2. 若目標(biāo)節(jié)點是當(dāng)前節(jié)點,直接發(fā)送;否則通過Redis轉(zhuǎn)發(fā)
if (targetNodeId.equals(nodeId)) {
WebSocketSession session = localSessions.get(targetUserId);
if (session != null && session.isOpen()) {
session.sendMessage(new TextMessage(content));
}
} else {
// 構(gòu)造跨節(jié)點消息(包含目標(biāo)用戶、內(nèi)容、目標(biāo)節(jié)點)
Map<String, String> crossMsg = new HashMap<>();
crossMsg.put("targetUserId", targetUserId);
crossMsg.put("content", content);
crossMsg.put("targetNodeId", targetNodeId);
// 發(fā)布到Redis頻道
redisTemplate.convertAndSend(
RedisKeyConstants.MSG_CHANNEL,
objectMapper.writeValueAsString(crossMsg)
);
}
}
// 訂閱Redis頻道,接收其他節(jié)點的消息并轉(zhuǎn)發(fā)給本地用戶
private void subscribeToRedisChannel() {
redisTemplate.getConnectionFactory().getConnection().subscribe(
message -> {
try {
String payload = new String(message.getBody());
Map<String, String> crossMsg = objectMapper.readValue(payload, Map.class);
// 僅處理目標(biāo)節(jié)點為當(dāng)前節(jié)點的消息
if (crossMsg.get("targetNodeId").equals(nodeId)) {
String targetUserId = crossMsg.get("targetUserId");
String content = crossMsg.get("content");
WebSocketSession session = localSessions.get(targetUserId);
if (session != null && session.isOpen()) {
session.sendMessage(new TextMessage(content));
}
}
} catch (Exception e) {
e.printStackTrace();
}
},
RedisKeyConstants.MSG_CHANNEL.getBytes()
);
}
// 連接關(guān)閉時,清理會話和Redis映射
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String userId = (String) session.getAttributes().get("userId");
localSessions.remove(userId);
// 刪除Redis中的用戶-節(jié)點映射
redisTemplate.delete(RedisKeyConstants.USER_NODE_MAP + ":" + userId);
}
}4.4 節(jié)點 ID 配置
在application.yml中配置節(jié)點 ID(也可啟動時自動生成,確保唯一):
websocket:
node:
id: ${HOSTNAME:node-${random.uuid}} # 優(yōu)先使用主機名,否則隨機生成四、集群測試
- 啟動多個 Spring Boot 實例(模擬集群節(jié)點),確保都連接到同一個 Redis。
- 客戶端 1 連接節(jié)點 A,客戶端 2 連接節(jié)點 B。
- 客戶端 1 發(fā)送消息給客戶端 2,消息流程:節(jié)點 A 接收消息,查詢 Redis 發(fā)現(xiàn)客戶端 2 在節(jié)點 B。節(jié)點 A 通過 Redis Pub/Sub 發(fā)布消息到MSG_CHANNEL。節(jié)點 B 訂閱了該頻道,接收消息并推送給客戶端 2。
五、優(yōu)化建議
- 會話過期清理:給 Redis 中的USER_NODE_MAP設(shè)置過期時間(如 30 分鐘),并通過 WebSocket 心跳機制續(xù)約。
- 消息可靠性:Redis Pub/Sub 是 fire-and-forget 模式,若需可靠消息,可替換為 RabbitMQ 或 Kafka。
- 負載均衡:前端通過 Nginx 等負載均衡器連接 WebSocket 集群(需配置proxy_set_header Upgrade $http_upgrade;等參數(shù)支持 WebSocket)。
- 監(jiān)控:通過 Redis 的NODE_INFO鍵監(jiān)控節(jié)點狀態(tài),實現(xiàn)故障轉(zhuǎn)移
到此這篇關(guān)于Spring Boot 中實現(xiàn) WebSocket 集群處理的文章就介紹到這了,更多相關(guān)Spring Boot WebSocket 集群內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis_plus基礎(chǔ)教程(總結(jié)篇)
這篇文章主要介紹了Mybatis_plus基礎(chǔ)教程(總結(jié)篇),本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09
Java?注解@PostConstruct的原理及最佳使用場景分析
@PostConstruct?是?Java?中非常實用的注解,尤其是在?Spring?等框架中,它使得開發(fā)者可以方便地在?Bean?初始化后執(zhí)行額外的操作,本文給大家介紹@PostConstruct?的原理、使用場景及最佳實踐,感興趣的朋友一起看看吧2025-04-04
關(guān)于RedisTemplate之opsForValue的使用說明
這篇文章主要介紹了關(guān)于RedisTemplate之opsForValue的使用說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-06-06
基于spring boot 1.5.4 集成 jpa+hibernate+jdbcTemplate(詳解)
下面小編就為大家?guī)硪黄趕pring boot 1.5.4 集成 jpa+hibernate+jdbcTemplate(詳解)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-06-06

