Springboot使用SSE推送消息到客戶端的實(shí)現(xiàn)
1、場(chǎng)景:
服務(wù)端主動(dòng)推動(dòng)消息到客戶端(Electron 桌面應(yīng)用)
普通的HTTP/HTTPS請(qǐng)求要先客戶端發(fā)送請(qǐng)求,然后服務(wù)端才能返回結(jié)果
服務(wù)端主動(dòng)推送數(shù)據(jù)到客戶端,有兩種方案:
SSE:只能從服務(wù)端主動(dòng)推送數(shù)據(jù)到客戶端(單向),客戶端發(fā)送數(shù)據(jù)還是要使用HTTP/HTTPS請(qǐng)求
Socket 通信:客戶端和服務(wù)端都可以發(fā)送數(shù)據(jù)給對(duì)方(雙向)
先記錄SSE的配置(我的服務(wù)端之前是用nodejs express 寫(xiě)的,已經(jīng)使用 socket.io與客戶端適配好了,現(xiàn)在用Springboot重構(gòu))
2、實(shí)現(xiàn):
1、springboot 配置:
1、先實(shí)現(xiàn)Server層,方便其他控制器調(diào)用
package com.xxx.controller;
import com.qiang.service.SseService;
import com.qiang.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@RestController
@RequestMapping("/sse")
public class SseController {
// 注入 SSE 服務(wù)類
@Autowired
private SseService sseService;
@Autowired
private UserService userService;
/**
* 建立 SSE 連接(調(diào)用 Service 的注冊(cè)方法)
*/
@GetMapping("/connect")
public SseEmitter connect(@RequestParam String clientId) {
// 查詢會(huì)話成員表,userId通過(guò)客戶端id獲取該用戶所有的群聊
String userId = clientId.split("_")[2].trim();
List<String> groupIdList = userService.getUserGroupChat(userId);
Set<String> groupIdSet = new HashSet<>();
if (groupIdList != null && !groupIdList.isEmpty()) {
groupIdSet = new HashSet<>(groupIdList); // List → Set,自動(dòng)去重
}
return sseService.registerClient(clientId, groupIdSet);
}
/**
* 手動(dòng)推送(調(diào)用 Service 的推送方法)
*/
@PostMapping("/push")
public String push(@RequestBody Map<String, String> params) {
String clientId = params.get("clientId");
String message = params.get("message");
return sseService.sendMessage(clientId, "business", message);
}
/**
* 獲取在線客戶端數(shù)量
*/
@GetMapping("/count")
public String getClientCount() {
return "當(dāng)前在線客戶端數(shù)量:" + sseService.getConnectedClientCount();
}
}2、再實(shí)現(xiàn)控制層:
package com.xxx.controller;
import com.qiang.service.SseService;
import com.qiang.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@RestController
@RequestMapping("/sse")
public class SseController {
// 注入 SSE 服務(wù)類
@Autowired
private SseService sseService;
@Autowired
private UserService userService;
/**
* 建立 SSE 連接(調(diào)用 Service 的注冊(cè)方法)
*/
@GetMapping("/connect")
public SseEmitter connect(@RequestParam String clientId) {
// 查詢會(huì)話成員表,userId通過(guò)客戶端id獲取該用戶所有的群聊
String userId = clientId.split("_")[2].trim();
List<String> groupIdList = userService.getUserGroupChat(userId);
Set<String> groupIdSet = new HashSet<>();
if (groupIdList != null && !groupIdList.isEmpty()) {
groupIdSet = new HashSet<>(groupIdList); // List → Set,自動(dòng)去重
}
return sseService.registerClient(clientId, groupIdSet);
}
/**
* 手動(dòng)推送(調(diào)用 Service 的推送方法)
*/
@PostMapping("/push")
public String push(@RequestBody Map<String, String> params) {
String clientId = params.get("clientId");
String message = params.get("message");
return sseService.sendMessage(clientId, "business", message);
}
/**
* 獲取在線客戶端數(shù)量
*/
@GetMapping("/count")
public String getClientCount() {
return "當(dāng)前在線客戶端數(shù)量:" + sseService.getConnectedClientCount();
}
}2、客戶端(Electron 配置):
eventsource 是瀏覽器對(duì)象
我是在客戶端主進(jìn)程中接收消息的,要下載依賴。如果是在渲染進(jìn)程或者是普通前端使用則不需要下載依賴
"eventsource": "^2.0.2"
1、工具函數(shù):
// src/util/sseClient.js(主進(jìn)程專用)
const EventSource = require('eventsource');
class SseClient {
constructor(clientId) {
this.clientId = clientId;
this.isConnected = false; // 標(biāo)記是否真正連接成功
this.initSse();
}
initSse() {
this.close(); // 關(guān)閉舊連接
const sseUrl = `http://localhost:8088/sse/connect?clientId=${this.clientId}`;
console.log(`[SSE] 嘗試連接:${sseUrl}`);
this.es = new EventSource(sseUrl);
// 1. 連接成功(標(biāo)記真正的連接狀態(tài))
this.es.onopen = () => {
this.isConnected = true;
console.log('[SSE] 連接成功');
};
// 2. 優(yōu)化錯(cuò)誤處理(過(guò)濾無(wú)害錯(cuò)誤)
this.es.onerror = (e) => {
// 過(guò)濾:連接成功前的無(wú)消息錯(cuò)誤(無(wú)害)
if (!this.isConnected && e.message === undefined) {
console.log('[SSE] 初始化階段臨時(shí)錯(cuò)誤(無(wú)害):', e.type);
return; // 不打印錯(cuò)誤,避免干擾
}
// 真正的錯(cuò)誤(連接斷開(kāi)/失敗)
this.isConnected = false;
console.error('[SSE] 真正的連接錯(cuò)誤:', {
type: e.type,
message: e.message || '未知錯(cuò)誤',
readyState: this.es.readyState // 0:連接中, 1:已連接, 2:已關(guān)閉
});
// 僅在連接關(guān)閉時(shí)重連
if (this.es.readyState === EventSource.CLOSED) {
console.log(`[SSE] 3秒后嘗試重連...`);
setTimeout(() => this.initSse(), 3000);
}
};
// 3. 正常接收消息
this.es.onmessage = (e) => {
try {
const cleanData = e.data.replace(/^data: /, '').trim();
const messageObj = JSON.parse(cleanData);
// 打印解析結(jié)果(驗(yàn)證)
console.log('[SSE] 解析后的完整對(duì)象:', messageObj);
} catch (err) {
// 解析失敗時(shí)的容錯(cuò)
console.warn('[SSE] 解析失敗,原始數(shù)據(jù):', e.data);
console.error('[SSE] 解析錯(cuò)誤詳情:', err);
}
};
// 4. 監(jiān)聽(tīng)自定義事件(如 notification/business)
this.es.addEventListener('notification', (e) => {
const data = JSON.parse(e.data);
console.log('[SSE] 通知消息:', data);
});
}
close() {
if (this.es) {
this.es.close();
this.es = null;
this.isConnected = false;
}
}
// 手動(dòng)推送(修復(fù)后的 POST 版本)
async triggerPush(message) {
if (!this.isConnected) {
console.warn('[SSE] 未連接,無(wú)法推送');
return null;
}
try {
const response = await fetch(`${this.serverUrl || 'http://localhost:8088'}/sse/push`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ clientId: this.clientId, message })
});
const result = await response.text();
console.log('[SSE] 手動(dòng)推送結(jié)果:', result);
return result;
} catch (err) {
console.error('[SSE] 手動(dòng)推送失敗:', err);
return null;
}
}
}
module.exports = SseClient;2、調(diào)用:
// clientId 要唯一,否則服務(wù)端推送消息時(shí)會(huì)有影響
new SseClient("自定義的clientId");
到此這篇關(guān)于Springboot使用SSE推送消息到客戶端的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Springboot SSE推送內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot+Vue3整合SSE實(shí)現(xiàn)實(shí)時(shí)消息推送功能
- SpringBoot整合SSE接口實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)推送
- SpringBoot+SseEmitter和Vue3+EventSource實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)推送
- springboot -sse -flux 服務(wù)器推送消息的方法
- SpringBoot SSE服務(wù)端主動(dòng)推送事件的實(shí)現(xiàn)
- Springboot集成SSE實(shí)現(xiàn)單工通信消息推送流程詳解
相關(guān)文章
Eclipse查看開(kāi)發(fā)包jar里源代碼的方法
這篇文章主要介紹了Eclipse查看開(kāi)發(fā)包jar里源代碼的方法的相關(guān)資料,需要的朋友可以參考下2017-07-07
SpringBoot事件發(fā)布與監(jiān)聽(tīng)超詳細(xì)講解
今天去官網(wǎng)查看spring boot資料時(shí),在特性中看見(jiàn)了系統(tǒng)的事件及監(jiān)聽(tīng)章節(jié),所以下面這篇文章主要給大家介紹了關(guān)于SpringBoot事件發(fā)布和監(jiān)聽(tīng)的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-11-11
Java?數(shù)據(jù)結(jié)構(gòu)與算法系列精講之棧
棧(stack)又名堆棧,它是一種運(yùn)算受限的線性表。限定僅在表尾進(jìn)行插入和刪除操作的線性表。這一端被稱為棧頂,相對(duì)地,把另一端稱為棧底,棧是基礎(chǔ)中的基礎(chǔ),如果你還沒(méi)掌握透徹就來(lái)接著往下看吧2022-02-02
詳解Java的MyBatis框架中SQL語(yǔ)句映射部分的編寫(xiě)
這篇文章主要介紹了Java的MyBatis框架中SQL語(yǔ)句映射部分的編寫(xiě),文中分為resultMap和增刪查改實(shí)現(xiàn)兩個(gè)部分來(lái)講解,需要的朋友可以參考下2016-04-04
Java SMM框架關(guān)聯(lián)關(guān)系映射示例講解
SSM框架是spring MVC ,spring和mybatis框架的整合,是標(biāo)準(zhǔn)的MVC模式,將整個(gè)系統(tǒng)劃分為表現(xiàn)層,controller層,service層,DAO層四層,使用spring MVC負(fù)責(zé)請(qǐng)求的轉(zhuǎn)發(fā)和視圖管理,spring實(shí)現(xiàn)業(yè)務(wù)對(duì)象管理,mybatis作為數(shù)據(jù)對(duì)象的持久化引擎2022-08-08
Eclipse導(dǎo)入項(xiàng)目報(bào)錯(cuò)問(wèn)題解決方案
這篇文章主要介紹了Eclipse導(dǎo)入項(xiàng)目報(bào)錯(cuò)問(wèn)題解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07
Maven Spring jar包啟動(dòng)報(bào)錯(cuò)問(wèn)題解決方案
maven 編譯jar包,放在linux服務(wù)器啟動(dòng)不起來(lái),提示:xxxx-0.0.1-SNAPSHOT.jar中沒(méi)有主清單屬性,接下來(lái)通過(guò)本文給大家分享問(wèn)題原因及解決方案,感興趣的朋友跟隨小編一起看看吧2023-10-10
Java使用easyExcel實(shí)現(xiàn)導(dǎo)入功能
這篇文章介紹了Java使用easyExcel實(shí)現(xiàn)導(dǎo)入功能的方法,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-10-10

