springboot websocket集群(stomp協(xié)議)連接時候傳遞參數(shù)
最近在公司項(xiàng)目中接到個需求。就是后臺跟前端瀏覽器要保持長連接,后臺主動往前臺推數(shù)據(jù)。
網(wǎng)上查了下,websocket stomp協(xié)議處理這個很簡單。尤其是跟springboot 集成。
但是由于開始是單機(jī)玩的,很順利。
但是后面部署到生產(chǎn)搞集群的話,就會出問題了。
假如集群兩個節(jié)點(diǎn),瀏覽器A與節(jié)點(diǎn)A建立連接,A節(jié)點(diǎn)發(fā)的消息瀏覽器A節(jié)點(diǎn)肯定能收到。但是B節(jié)點(diǎn)由于沒有跟瀏覽器A建立連接。B節(jié)點(diǎn)發(fā)的消息瀏覽器就收不到了。
網(wǎng)上也查了好多,但是沒有一個說的很清楚的,也很多都是理論層面的。
還有很多思路都是通過session獲取信息的。但是這都不是我需要的。我需要的是從前臺傳遞參數(shù),連接的時候每個節(jié)點(diǎn)保存下。然后通過SimpleUserRegistry.getUser獲取。
話不多說,直接上代碼。
<script type="text/javascript" src="${request.contextPath}/scripts/sockjs.min.js"></script>
<script type="text/javascript" src="${request.contextPath}/scripts/stomp.min.js"></script>
var WEB_SOCKET = {
topic : "",
url : "",
stompClient : null,
connect : function(url, topic, callback,userid) {
this.url = url;
this.topic = topic;
var socket = new SockJS(url); //連接SockJS的endpoint名稱為"endpointOyzc"
WEB_SOCKET.stompClient = Stomp.over(socket);//使用STMOP子協(xié)議的WebSocket客戶端
WEB_SOCKET.stompClient.connect({userid:userid},function(frame){//連接WebSocket服務(wù)端
// console.log('Connected:' + frame);
//通過stompClient.subscribe訂閱/topic/getResponse 目標(biāo)(destination)發(fā)送的消息
WEB_SOCKET.stompClient.subscribe(topic, callback);
});
}
};
這是響應(yīng)的前端代碼。只需要引入兩個js。調(diào)用new SockJS(url) 就代表跟服務(wù)器建立連接了。
@Configuration
//注解開啟使用STOMP協(xié)議來傳輸基于代理(message broker)的消息,這時控制器支持使用@MessageMapping,就像使用@RequestMapping一樣
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Autowired
private GetHeaderParamInterceptor getHeaderParamInterceptor;
@Override
//注冊STOMP協(xié)議的節(jié)點(diǎn)(endpoint),并映射指定的url
public void registerStompEndpoints(StompEndpointRegistry registry) {
//注冊一個STOMP的endpoint,并指定使用SockJS協(xié)議
registry.addEndpoint("/endpointOyzc")
.setAllowedOrigins("*")
.withSockJS();
/* registry.addEndpoint("/endpointOyzc")
.setAllowedOrigins("*")
.setHandshakeHandler(xlHandshakeHandler)
.withSockJS();*/
}
@Override
//配置消息代理(Message Broker)
public void configureMessageBroker(MessageBrokerRegistry registry) {
//點(diǎn)對點(diǎn)應(yīng)配置一個/user消息代理,廣播式應(yīng)配置一個/topic消息代理
registry.enableSimpleBroker("/topic", "/user");
// 全局使用的消息前綴(客戶端訂閱路徑上會體現(xiàn)出來)
//registry.setApplicationDestinationPrefixes("/app");
//點(diǎn)對點(diǎn)使用的訂閱前綴(客戶端訂閱路徑上會體現(xiàn)出來),不設(shè)置的話,默認(rèn)也是/user/
registry.setUserDestinationPrefix("/user");
}
/**
* 采用自定義攔截器,獲取connect時候傳遞的參數(shù)
*
* @param registration
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(getHeaderParamInterceptor);
}
}
注:上面的endpointOyzc就是前端的url。后面注冊端點(diǎn),前臺鏈接。
然后注意下configureClientInboundChannel這個方法,這個方法里面注入攔截器就是為了鏈接時候接收參數(shù)的。
/**
* @author : hao
* @description : websocket建立鏈接的時候獲取headeri里認(rèn)證的參數(shù)攔截器。
* @time : 2019/7/3 20:42
*/
@Component
public class GetHeaderParamInterceptor extends ChannelInterceptorAdapter {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
if (raw instanceof Map) {
Object name = ((Map) raw).get("userid");
if (name instanceof LinkedList) {
// 設(shè)置當(dāng)前訪問的認(rèn)證用戶
accessor.setUser(new JqxxPrincipal(((LinkedList) name).get(0).toString()));
}
}
}
return message;
}
}
/**
* @author : hao
* @description : 自定義的java.security.Principal
* @time : 2019/7/3 20:42
*/
public class JqxxPrincipal implements Principal {
private String loginName;
public JqxxPrincipal(String loginName) {
this.loginName = loginName;
}
@Override
public String getName() {
return loginName;
}
}
這樣就存入的前臺傳的參數(shù)。
后臺發(fā)消息的時候怎么發(fā)呢?
/**
* @author : hao
* @description : websocket發(fā)送代理,負(fù)責(zé)發(fā)送消息
* @time : 2019/7/4 11:01
*/
@Component
@Slf4j
public class WebsocketSendProxy<T> {
@Autowired
private SimpMessagingTemplate template;
@Autowired
private SimpUserRegistry userRegistry;
@Resource(name = "redisServiceImpl")
private RedisService redisService;
@Value("spring.redis.message.topic-name")
private String topicName;
public void sendMsg(RedisWebsocketMsg<T> redisWebsocketMsg) {
SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
log.info("發(fā)送消息前獲取接收方為{},根據(jù)Registry獲取本節(jié)點(diǎn)上這個用戶{}", redisWebsocketMsg.getReceiver(), simpUser);
if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) {
//2. 獲取WebSocket客戶端的訂閱地址
WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
if (channelEnum != null) {
//3. 給WebSocket客戶端發(fā)送消息
template.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
}
} else {
//給其他訂閱了主題的節(jié)點(diǎn)發(fā)消息,因?yàn)楸竟?jié)點(diǎn)沒有
redisService.convertAndSend(topicName, redisWebsocketMsg);
}
}
}
可以發(fā)現(xiàn)上面代碼利用了redis監(jiān)聽模型,也就是redis模型的消息隊(duì)列
/**
* @author : hao
* @description : redis消息監(jiān)聽實(shí)現(xiàn)類,接收處理類
* @time : 2019/7/3 14:00
*/
@Component
@Slf4j
public class MessageReceiver {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
/**
* 處理WebSocket消息
*/
public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {
log.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));
//1. 取出用戶名并判斷是否連接到當(dāng)前應(yīng)用節(jié)點(diǎn)的WebSocket
SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) {
//2. 獲取WebSocket客戶端的訂閱地址
WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
if (channelEnum != null) {
//3. 給WebSocket客戶端發(fā)送消息
messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
}
}
}
}
redis消息模型只貼部分代碼就好了
/**
* 消息監(jiān)聽器
*/
@Bean
MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){
//消息接收者以及對應(yīng)的默認(rèn)處理方法
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage");
//消息的反序列化方式
messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);
return messageListenerAdapter;
}
/**
* message listener container
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory
, MessageListenerAdapter messageListenerAdapter){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//添加消息監(jiān)聽器
container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName));
return container;
}
上面的思路大體如下:客戶端簡歷鏈接時候,傳過來userid保存起來。發(fā)消息的時候 通過userRegistry獲取,能獲取到就證明是跟本節(jié)點(diǎn)建立的鏈接,直接用本節(jié)點(diǎn)發(fā)消息就好了。
如果不是就利用redis消息隊(duì)列,把消息推出去。每個節(jié)點(diǎn)去判斷獲取看下是不是本節(jié)點(diǎn)的userid。這樣就實(shí)現(xiàn)了集群的部署。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Java后端Tomcat實(shí)現(xiàn)WebSocket實(shí)例教程
WebSocket protocol 是HTML5一種新的協(xié)議。它實(shí)現(xiàn)了瀏覽器與服務(wù)器全雙工通信(full-duplex)。一開始的握手需要借助HTTP請求完成握手。本文給大家介紹Java后端Tomcat實(shí)現(xiàn)WebSocket實(shí)例教程,感興趣的朋友一起學(xué)習(xí)吧2016-05-05
springBoot Junit測試用例出現(xiàn)@Autowired不生效的解決
這篇文章主要介紹了springBoot Junit測試用例出現(xiàn)@Autowired不生效的解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
CodeGPT + IDEA + DeepSeek如何在IDEA中引入DeepS
文章介紹了如何在IDEA中使用CodeGPT和DeepSeek插件實(shí)現(xiàn)AI智能開發(fā),具體內(nèi)容包括安裝步驟、配置APIkey和參數(shù)設(shè)置等,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),感興趣的朋友一起看看吧2025-02-02
關(guān)于springcloud報(bào)錯報(bào)UnsatisfiedDependencyException的問題
這篇文章主要介紹了關(guān)于springcloud報(bào)錯報(bào)UnsatisfiedDependencyException的問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11
Java基礎(chǔ)入門篇之邏輯控制練習(xí)題與猜數(shù)字游戲
猜數(shù)字游戲是一款經(jīng)典的游戲,該游戲說簡單也很簡單,說不簡單確實(shí)也很難,這篇文章主要給大家介紹了關(guān)于Java基礎(chǔ)入門篇之邏輯控制練習(xí)題與猜數(shù)字游戲的相關(guān)資料,需要的朋友可以參考下2023-06-06
java代碼實(shí)現(xiàn)斗地主發(fā)牌功能
這篇文章主要介紹了java實(shí)現(xiàn)斗地主發(fā)牌功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2020-11-11
MyBatis-Plus中靜態(tài)工具Db的多種用法及實(shí)例分析
本文將詳細(xì)講解MyBatis-Plus中靜態(tài)工具Db的各種用法,并結(jié)合具體案例進(jìn)行演示和說明,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2025-03-03

