SpringBoot整合WebSocket的客戶端和服務(wù)端的實現(xiàn)代碼
本文是項目中使用了websocket進(jìn)行一些數(shù)據(jù)的推送,對比項目做了一個demo,ws的相關(guān)問題不做細(xì)數(shù),僅做一下記錄。
此demo針對ws的搭建主要邏輯背景是一個服務(wù)端B:通訊層 產(chǎn)生消息推送出去,另外一個項目A充當(dāng)客戶端和服務(wù)端,A的客戶端:是接收通訊層去無差別接收這些消息,A的服務(wù)端:根據(jù)地址ip去訂閱。用戶通過訂閱A的ws,同時記錄下自己的信息,項目B推送的消息,項目A接收到之后通過當(dāng)初訂閱的邏輯和一些權(quán)限過濾條件對項目B產(chǎn)生的消息進(jìn)行過濾再推送到用戶客戶端上。
一、項目中服務(wù)端的創(chuàng)建
首先引入maven倉庫
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>websocket的服務(wù)端搭建
同時注意springboot要開啟ws服務(wù)
啟動類加上@EnableScheduling
簡要解讀demo
/webSocket/{id}:鏈接的id是業(yè)務(wù)上的一個id,這邊之前做過類似拍賣的,相當(dāng)于一個服務(wù)端或者業(yè)務(wù)上的一個標(biāo)識,是客戶端指明鏈接到哪一個拍賣間的標(biāo)識
@ServerEndpoint:作為服務(wù)端的注解。
package com.ghh.myproject.websocket;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{id}")
@Component
public class WebSocket {
private Logger log = LoggerFactory.getLogger(WebSocket.class);
private static int onlineCount = 0;
/** 創(chuàng)建一個map存放 產(chǎn)生的ws鏈接推送 */
private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
/** 創(chuàng)建一個map存放 當(dāng)前接入的客戶端 */
private static Map<String, String> idMap = new ConcurrentHashMap<>();
private Session session;
/** 鏈接進(jìn)入的一個場景id */
private String id;
/** 每一個鏈接的一個唯一標(biāo)識 */
private String userNo;
/**
* @Description: 第三方文接入當(dāng)前項目websocket后的記錄信息
* @DateTime: 2021/7/5 10:02
* @Author: GHH
* @Params: [id, session]
* @Return void
*/
@OnOpen
public void onOpen(@PathParam("id") String id, Session session) throws IOException {
log.info("已連接到id:{}競拍場,當(dāng)前競拍場人數(shù):{}", id, getUserNosById(id).size());
this.id = id;
this.session = session;
// 生成一個隨機(jī)序列號來存儲一個id下的所有用戶
this.userNo = UUID.fastUUID().toString();
addOnlineCount();
//根據(jù)隨機(jī)序列號存儲一個socket連接
clients.put(userNo, this);
idMap.put(userNo, id);
}
/**
* @Description: 關(guān)閉連接
* @DateTime: 2021/7/5 10:02
* @Author: GHH
* @Params: []
* @Return void
*/
@OnClose
public void onClose() throws IOException {
clients.remove(userNo);
idMap.remove(userNo);
subOnlineCount();
}
/**
* @Description: 客戶端發(fā)送消息調(diào)用此方法
* @DateTime: 2021/6/16 15:35
* @Author: GHH
* @Params: [message]
* @Return void
*/
@OnMessage
public void onMessage(String message) throws IOException {
// JSONObject jsonTo = JSONObject.parseObject(message);
// String mes = (String) jsonTo.get("message");
// if (!("All").equals(jsonTo.get("To"))) {
// sendMessageTo(mes, jsonTo.get("To").toString());
// } else {
// sendMessageAll(message);
// }
log.info("onMessage方法成功");
}
@OnError
public void onError(Session session, Throwable error) {
log.error("{}", error);
}
public static void sendMessageTo(String message, String userNo) throws IOException {
// session.getBasicRemote().sendText(message);
//session.getAsyncRemote().sendText(message);
WebSocket webSocket = clients.get(userNo);
if (webSocket != null && webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
}
}
/**
* @Description: 推送到指定的id值的記錄
* @DateTime: 2021/6/15 17:11
* @Author: GHH
* @Params: [message, id]
* @Return void
*/
public static void sendMessageToById(String message, String id) {
// session.getBasicRemote().sendText(message);
//session.getAsyncRemote().sendText(message);
//根據(jù)id獲取所有的userNo鏈接的用戶
List<String> userNos = getUserNosById(id);
for (WebSocket item : clients.values()) {
//遍歷鏈接的value值,如果當(dāng)前傳入的id中鏈接的用戶包含value值,則推送。
if (userNos.contains(item.userNo)) {
item.session.getAsyncRemote().sendText(message);
}
}
}
/**
* @Description: 推送所有開啟的信息
* @DateTime: 2021/6/15 17:13
* @Author: GHH
* @Params: [message]
* @Return void
*/
public static void sendMessageAll(String message){
for (WebSocket item : clients.values()) {
item.session.getAsyncRemote().sendText(message);
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocket.onlineCount--;
}
public static synchronized Map<String, WebSocket> getClients() {
return clients;
}
/**
* @Description: 根據(jù)相應(yīng)場景的一些邏輯處理
* @DateTime: 2021/7/5 10:03
* @Author: GHH
* @Params: [id]
* @Return java.util.List<java.lang.String>
*/
public static List<String> getUserNosById(String id) {
ArrayList<String> userNos = new ArrayList<>();
for (Map.Entry<String, String> entry : idMap.entrySet()) {
if (entry.getValue().equals(id)) {
userNos.add(entry.getKey());
}
}
return userNos;
}
}demo中模擬的是定時器推送,第一個參數(shù)是消息內(nèi)容,第二個是推送到哪一個拍賣間或者其他業(yè)務(wù)上的內(nèi)容。方法的具體內(nèi)容上一段代碼有詳細(xì)解釋,有通過id,或者發(fā)送給全部ws鏈接的客戶端
WebSocket.sendMessageToById(""+count,2+"");@Scheduled(cron = "*/5 * * * * ?")
public void job1(){
log.info("測試生成次數(shù):{}",count);
redisTemplate.opsForValue().set("測試"+count, ""+count++);
if (count%2==0){
WebSocket.sendMessageToById(""+count,2+"");
}else {
WebSocket.sendMessageToById(""+count,1+"");
}
log.info("websocket發(fā)送"+count);
}二、java充當(dāng)客戶端鏈接ws
上述是java作為ws服務(wù)端推送當(dāng)前業(yè)務(wù)信息的一個demo。我們項目目前做的是一個通訊層的概念,只能夠推送數(shù)據(jù)內(nèi)容,卻無法根據(jù)用戶權(quán)限去推送不同的數(shù)據(jù)。
ws客戶端的搭建,首先鏈接ws服務(wù)端。首先是我們另外一個服務(wù)的ws配置信息,我這邊demo是模擬鏈接上面的ws服務(wù)
1、ws客戶端的配置
package com.ghh.websocketRecive.wsMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.net.URI;
/**
* @author ghh
* @date 2019-08-16 16:02
*/
@Component
@Slf4j
public class WSClient {
public static Session session;
public static void startWS() {
try {
if (WSClient.session != null) {
WSClient.session.close();
}
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
//設(shè)置消息大小最大為10M
container.setDefaultMaxBinaryMessageBufferSize(10*1024*1024);
container.setDefaultMaxTextMessageBufferSize(10*1024*1024);
// 客戶端,開啟服務(wù)端websocket。
String uri = "ws://192.168.0.108:8082/webSocket/1";
Session session = container.connectToServer(WSHandler.class, URI.create(uri));
WSClient.session = session;
} catch (Exception ex) {
log.info(ex.getMessage());
}
}
}2、配置信息需要在項目啟動的時候去啟用和鏈接ws服務(wù)
package com.ghh.websocketRecive;
import com.ghh.websocketRecive.wsMessage.WSClient;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import javax.annotation.PostConstruct;
@Slf4j
@EnableScheduling
@SpringBootApplication
@MapperScan("com.ghh.websocketRecive.dao")
public class WebsocketReciveApplication {
public static void main(String[] args) {
SpringApplication.run(WebsocketReciveApplication.class, args);
}
@PostConstruct
public void init(){
log.info("初始化應(yīng)用程序"); // 初始化ws,鏈接服務(wù)端
WSClient.startWS();
}
}3、接收服務(wù)端推送的消息進(jìn)行權(quán)限過濾demo
@ClientEndpoint:作為ws的客戶端注解,@OnMessage接收服務(wù)端推送的消息。
package com.ghh.websocketRecive.wsMessage;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ghh.websocketRecive.entity.Student;
import com.ghh.websocketRecive.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import java.util.Objects;
import java.util.Set;
import static com.ghh.websocketRecive.wsMessage.WSClient.startWS;
@ClientEndpoint
@Slf4j
@Component
public class WSHandler {
@Autowired
RedisTemplate<String,String> redisTemplate;
private static RedisTemplate<String,String> redisTemplateService;
@PostConstruct
public void init() {
redisTemplateService=redisTemplate;
}
@OnOpen
public void onOpen(Session session) {
WSClient.session = session;
}
@OnMessage
public void processMessage(String message) {
log.info("websocketRecive接收推送消息"+message);
int permission = Integer.parseInt(message)%5;
//查詢所有訂閱的客戶端的ip。
Set<String> keys = redisTemplateService.keys("ip:*");
for (String key : keys) {
// 根據(jù)登錄后存儲的客戶端ip,獲取權(quán)限地址
String s = redisTemplateService.opsForValue().get(key);
String[] split = s.split(",");
for (String s1 : split) {
//向含有推送過來的數(shù)據(jù)權(quán)限地址的客戶端推送告警數(shù)據(jù)。
if (s1.equals(permission+"")){
WebSocket.sendMessageToByIp(message,key.split(":")[1]);
}
}
}
}
@OnError
public void processError(Throwable t) {
WSClient.session = null;
try {
Thread.sleep(5000);
startWS();
} catch (InterruptedException e) {
log.error("---websocket processError InterruptedException---", e);
}
log.error("---websocket processError error---", t);
}
@OnClose
public void processClose(Session session, CloseReason closeReason) {
log.error(session.getId() + closeReason.toString());
}
public void send(String sessionId, String message) {
try {
log.info("send Msg:" + message);
if (Objects.nonNull(WSClient.session)) {
WSClient.session.getBasicRemote().sendText(message);
} else {
log.info("---websocket error----");
}
} catch (Exception e) {
log.error("---websocket send error---", e);
}
}
}4、ws客戶端推送消息,推送消息和上面服務(wù)端類似。
這邊是根據(jù)ip
package com.ghh.websocketRecive.wsMessage;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import com.ghh.websocketRecive.service.UserService;
import lombok.Builder;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{ip}")
@Component
public class WebSocket {
private Logger log = LoggerFactory.getLogger(WebSocket.class);
private static int onlineCount = 0;
private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
private Session session;
/** 當(dāng)前連接服務(wù)端的客戶端ip */
private String ip;
@Autowired
RedisTemplate<String,String> redisTemplate;
private static RedisTemplate<String,String> redisTemplateService;
@PostConstruct
public void init() {
redisTemplateService = redisTemplate;
}
@OnOpen
public void onOpen(@PathParam("ip") String ip, Session session) throws IOException {
log.info("ip:{}客戶端已連接:,當(dāng)前客戶端數(shù)量:{}", ip, onlineCount+1);
this.ip = ip;
this.session = session;
// 接入一個websocket則生成一個隨機(jī)序列號
addOnlineCount();
//根據(jù)隨機(jī)序列號存儲一個socket連接
clients.put(ip, this);
}
@OnClose
public void onClose() throws IOException {
clients.remove(ip);
onlineCount--;
subOnlineCount();
}
/**
* @Description: 客戶端發(fā)送消息調(diào)用此方法
* @DateTime: 2021/6/16 15:35
* @Author: GHH
* @Params: [message]
* @Return void
*/
@OnMessage
public void onMessage(String message) throws IOException {
log.info("客戶端發(fā)送消onMessage方法成功");
}
@OnError
public void onError(Session session, Throwable error) {
log.error("{}", error);
}
public static void sendMessageTo(String message, String userNo) throws IOException {
WebSocket webSocket = clients.get(userNo);
if (webSocket != null && webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
}
}
/**
* @Description: 推送到指定的ip值的記錄
* @DateTime: 2021/6/15 17:11
* @Author: GHH
* @Params: [message, id]
* @Return void
*/
public static void sendMessageToByIp(String message, String ip) {
for (WebSocket item : clients.values()) {
//遍歷鏈接的value值,如果當(dāng)前傳入的ip中鏈接的用戶包含value值,則推送。
if (item.ip.equals(ip)) {
item.session.getAsyncRemote().sendText(message);
}
}
}
/**
* @Description: 推送所有開啟的信息
* @DateTime: 2021/6/15 17:13
* @Author: GHH
* @Params: [message]
* @Return void
*/
public static void sendMessageAll(String message){
for (WebSocket item : clients.values()) {
item.session.getAsyncRemote().sendText(message);
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocket.onlineCount--;
}
public static synchronized Map<String, WebSocket> getClients() {
return clients;
}
}概述:
至此,簡易的demo搭建完成,項目gitee網(wǎng)址:https://gitee.com/ghhNB/study.git
到此這篇關(guān)于SpringBoot整合WebSocket的客戶端和服務(wù)端的實現(xiàn)的文章就介紹到這了,更多相關(guān)SpringBoot整合WebSocket內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot動態(tài)定時任務(wù)實現(xiàn)完整版
最近有幸要開發(fā)個動態(tài)定時任務(wù),這里簡單再梳理一下,下面這篇文章主要給大家介紹了關(guān)于SpringBoot動態(tài)定時任務(wù)實現(xiàn)的相關(guān)資料,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-02-02
JAVA構(gòu)造方法/構(gòu)造器以及this使用方式
這篇文章主要介紹了JAVA構(gòu)造方法/構(gòu)造器以及this使用方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-03-03
BufferedWriter如何使用write方法實現(xiàn)換行
這篇文章主要介紹了BufferedWriter如何使用write方法實現(xiàn)換行的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
一篇文章帶你入門java算術(shù)運算符(加減乘除余,字符連接)
這篇文章主要介紹了Java基本數(shù)據(jù)類型和運算符,結(jié)合實例形式詳細(xì)分析了java基本數(shù)據(jù)類型、數(shù)據(jù)類型轉(zhuǎn)換、算術(shù)運算符、邏輯運算符等相關(guān)原理與操作技巧,需要的朋友可以參考下2021-08-08
Spring Web MVC框架學(xué)習(xí)之配置Spring Web MVC
這一篇文章講的是Spring Web MVC各部分的配置方法,包括Java代碼配置和XML文件配置以及MVC命名空間的使用方法。2017-03-03
Spring @Valid和@Validated區(qū)別和用法實例
這篇文章主要介紹了Spring @Valid和@Validated區(qū)別和用法實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-04-04

