SpringBoot使用SSE進(jìn)行實(shí)時(shí)通知前端的實(shí)現(xiàn)代碼
說(shuō)明
項(xiàng)目有個(gè)需求是要實(shí)時(shí)通知前端,告訴前端這個(gè)任務(wù)加載好了。然后想了2個(gè)方案,一種是用websocket進(jìn)行長(zhǎng)連接,一種是使用SSE(Sever Send Event),是HTTP協(xié)議中的一種,Content-Type為text/event-stream,能夠保持長(zhǎng)連接。
websocket是前端既能向后端發(fā)送消息,后端也能向前端發(fā)送消息。
SSE是只能后端向前端發(fā)送消息。
因?yàn)橹恍枰蠖送ㄖ?,所以我這里選擇了使用SSE實(shí)現(xiàn)。
這里先做個(gè)筆記,怕以后忘記怎么使用。
maven依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.project</groupId>
<artifactId>test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>test</name>
<description>test</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--web依賴,內(nèi)嵌入tomcat,SSE依賴于該jar包,只要有該依賴就能使用SSE-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--lombok依賴,用來(lái)對(duì)象省略寫set、get方法-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>SSE工具類代碼
package com.etone.project.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@Slf4j
public class SseEmitterServer {
/**
* 當(dāng)前連接數(shù)
*/
private static AtomicInteger count = new AtomicInteger(0);
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
public static SseEmitter connect(String userId){
//設(shè)置超時(shí)時(shí)間,0表示不過(guò)期,默認(rèn)是30秒,超過(guò)時(shí)間未完成會(huì)拋出異常
SseEmitter sseemitter = new SseEmitter(0L);
//注冊(cè)回調(diào)
sseemitter.onCompletion(completionCallBack(userId));
//這個(gè)onError在springbooot低版本沒(méi)有這個(gè)方法,公司springboot1.4.2版本,沒(méi)有這個(gè)方法,可以進(jìn)行注釋。
sseemitter.onError(errorCallBack(userId));
sseemitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId,sseemitter);
//數(shù)量+1
count.getAndIncrement();
log.info("create new sse connect ,current user:{}",userId);
return sseemitter;
}
/**
* 給指定用戶發(fā)消息
*/
public static void sendMessage(String userId, String message){
if(sseEmitterMap.containsKey(userId)){
try{
sseEmitterMap.get(userId).send(message);
}catch (IOException e){
log.error("user id:{}, send message error:{}",userId,e.getMessage());
e.printStackTrace();
}
}
}
/**
* 想多人發(fā)送消息,組播
*/
public static void groupSendMessage(String groupId, String message){
if(sseEmitterMap!=null&&!sseEmitterMap.isEmpty()){
sseEmitterMap.forEach((k,v) -> {
try{
if(k.startsWith(groupId)){
v.send(message, MediaType.APPLICATION_JSON);
}
}catch (IOException e){
log.error("user id:{}, send message error:{}",groupId,message);
removeUser(k);
}
});
}
}
public static void batchSendMessage(String message) {
sseEmitterMap.forEach((k,v)->{
try{
v.send(message,MediaType.APPLICATION_JSON);
}catch (IOException e){
log.error("user id:{}, send message error:{}",k,e.getMessage());
removeUser(k);
}
});
}
/**
* 群發(fā)消息
*/
public static void batchSendMessage(String message, Set<String> userIds){
userIds.forEach(userid->sendMessage(userid,message));
}
//移除用戶
public static void removeUser(String userid){
sseEmitterMap.remove(userid);
//數(shù)量-1
count.getAndDecrement();
log.info("remove user id:{}",userid);
}
public static List<String> getIds(){
return new ArrayList<>(sseEmitterMap.keySet());
}
public static int getUserCount(){
return count.intValue();
}
private static Runnable completionCallBack(String userId) {
return () -> {
log.info("結(jié)束連接,{}",userId);
removeUser(userId);
};
}
private static Runnable timeoutCallBack(String userId){
return ()->{
log.info("連接超時(shí),{}",userId);
removeUser(userId);
};
}
private static Consumer<Throwable> errorCallBack(String userId){
return throwable -> {
log.error("連接異常,{}",userId);
removeUser(userId);
};
}
}Controller測(cè)試代碼
package com.project.test.controller;
import com.hjl.test.util.SseEmitterServer;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping(value = "/test")
public class TestController {
//sse連接接口
@GetMapping (value = "/sse/connect/{id}")
public SseEmitter connect(@PathVariable String id){
return SseEmitterServer.connect(id);
}
//sse向指定用戶發(fā)送消息接口
@GetMapping (value = "/sse/send/{id}")
public Map<String,Object> send(@PathVariable String id,@RequestParam(value = "message", required = false) String message){
Map<String,Object> returnMap = new HashMap<>();
//向指定用戶發(fā)送信息
SseEmitterServer.sendMessage(id,message);
returnMap.put("message","向id為"+id+"的用戶發(fā)送:"+message+"成功!");
returnMap.put("status","200");
returnMap.put("result",null);
return returnMap;
}
//sse向所有已連接用戶發(fā)送消息接口
@GetMapping (value = "/sse/batchSend")
public Map<String,Object> batchSend(@RequestParam(value = "message", required = false) String message){
Map<String,Object> returnMap = new HashMap<>();
//向指定用戶發(fā)送信息
SseEmitterServer.batchSendMessage(message);
returnMap.put("message",message+"消息發(fā)送成功!");
returnMap.put("status","200");
returnMap.put("result",null);
return returnMap;
}
//sse關(guān)閉接口
@GetMapping (value = "/sse/close/{id}")
public Map<String,Object> close(@PathVariable String id){
Map<String,Object> returnMap = new HashMap<>();
//移除id
SseEmitterServer.removeUser(id);
System.out.println("當(dāng)前連接用戶id:"+SseEmitterServer.getIds());
returnMap.put("message","連接關(guān)閉成功!");
returnMap.put("status","200");
returnMap.put("result",null);
return returnMap;
}
}測(cè)試結(jié)果如下:
這里測(cè)試SSE連接,就像正常接口那樣請(qǐng)求就行。
本地調(diào)用接口/sse/connect/1如下:
這里我連接2個(gè)用戶,用來(lái)模擬向指定用戶id發(fā)送信息和批量向已連接的用戶發(fā)送消。


后端服務(wù)打印如下:

本地調(diào)用接口/sse/send/1如下:

用戶1的結(jié)果如下,發(fā)現(xiàn)它收到了消息:

用戶2沒(méi)有收到結(jié)果,如下:

本地調(diào)用接口/sse/batchSend如下:
批量向所有已經(jīng)連接的用戶發(fā)送消息。

用戶1結(jié)果如下,發(fā)現(xiàn)接收到了消息:

用戶2結(jié)果如下,發(fā)現(xiàn)也接收到了消息:

測(cè)試結(jié)果都符合預(yù)期。
點(diǎn)擊postman的close按鈕,關(guān)閉連接:


發(fā)現(xiàn)前端連接雖然關(guān)閉了,但是后端實(shí)際還在連接中,根本沒(méi)有移除用戶的提示:

所以這里還需要自己手動(dòng)寫關(guān)閉接口測(cè)試。
本地調(diào)用接口/sse/close/1如下:

可以看到把用戶id為1的給移除了,只剩用戶2還在連接中。

這里所有測(cè)試完成,結(jié)果符合預(yù)期。
注意
將超時(shí)時(shí)間由原來(lái)的0改為默認(rèn)的30秒,會(huì)報(bào)錯(cuò)。

測(cè)試結(jié)果如下:


這里直接出現(xiàn)了一個(gè)異常:org.springframework.web.context.request.async.AsyncRequestTimeoutException
甚至連接都斷開了。
將springboot降為低版本如1.4.2.RELEASE。
使用postman進(jìn)行測(cè)試的時(shí)候,發(fā)現(xiàn)它不是一直在請(qǐng)求中:如下:
將Springboot降為1.4.2.RELEASE

springboot的1.4.2.RELEASE版本沒(méi)有onError方法,需要注釋掉。

postman測(cè)試如下:
低版本測(cè)試的時(shí)候發(fā)現(xiàn)它有一個(gè)這個(gè)連接可以直接看到,而使用springboot版本2.x版本就發(fā)現(xiàn)它一直處于發(fā)送請(qǐng)求的狀態(tài),什么時(shí)候后端向前端發(fā)送了消息,它就顯示這個(gè)。
springboot的1.4.2.RELEASE版本結(jié)果:

springboot的2.7.3版本結(jié)果:

這里先將這種情況先記錄下來(lái)先,等后面有時(shí)間再研究。怎么高版本就不能向低版本那樣返回這個(gè)連接信息呢?所以SpringBoot高版本使用SSE連接的時(shí)候一直處于Sending request這種情況,這種情況是正常的嗎?
到此這篇關(guān)于SpringBoot使用SSE進(jìn)行實(shí)時(shí)通知前端的文章就介紹到這了,更多相關(guān)SpringBoot實(shí)時(shí)通知前端內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java springboot poi 從controller 接收不同類型excel 文件處理
這篇文章主要介紹了java springboot poi 從controller 接收不同類型excel 文件處理,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-10-10
mybatis plus實(shí)體類中字段映射mysql中的json格式方式
這篇文章主要介紹了mybatis plus實(shí)體類中字段映射mysql中的json格式方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
springboot項(xiàng)目整合mybatis并配置mybatis中間件的實(shí)現(xiàn)
這篇文章主要介紹了springboot項(xiàng)目整合mybatis并配置mybatis中間件的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
JAVA實(shí)現(xiàn)漢字轉(zhuǎn)拼音功能代碼實(shí)例
這篇文章主要介紹了JAVA實(shí)現(xiàn)漢字轉(zhuǎn)拼音功能代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05
Java 數(shù)據(jù)結(jié)構(gòu)與算法系列精講之排序算法
排序算法是《數(shù)據(jù)結(jié)構(gòu)與算法》中最基本的算法之一。排序算法可以分為內(nèi)部排序和外部排序,內(nèi)部排序是數(shù)據(jù)記錄在內(nèi)存中進(jìn)行排序,而外部排序是因排序的數(shù)據(jù)很大,一次不能容納全部的排序記錄,在排序過(guò)程中需要訪問(wèn)外存2022-02-02

