SpringBoot實(shí)現(xiàn)消息推送功能的示例代碼
引言
想象一下這個(gè)場(chǎng)景:你的APP像個(gè)靦腆的男生,只會(huì)傻傻等著用戶來“敲門”(刷新頁面),而不知道主動(dòng)說“嘿,我有新消息給你!”這多尷尬??!消息推送就像給服務(wù)器裝了社交癥,讓它能從幕后跳出來大喊:“注意!有熱乎的消息!”
一、推送技術(shù)選型:給服務(wù)器裝上“大喇叭”
SSE(Server-Sent Events) - 像單相思,服務(wù)器可以一直對(duì)客戶端叨叨叨,但客戶端只能聽著 WebSocket - 像熱戀中的情侶,雙方可以隨時(shí)互發(fā)消息 輪詢(Polling) - 像查崗的女朋友,隔幾秒就問一次“有新消息嗎?” 長輪詢(Long Polling) - 像有耐心的女朋友,等不到消息就不掛電話
今天咱們重點(diǎn)玩一下SSE,因?yàn)樗?jiǎn)單直接,就像給服務(wù)器裝了個(gè)校園廣播站!
二、SpringBoot推送實(shí)戰(zhàn):三步搞定
第一步:添加依賴(給項(xiàng)目喂點(diǎn)“能量飲料”)
<!-- pom.xml -->
<dependencies>
<!-- SpringBoot基礎(chǔ)套餐 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 給模板引擎加點(diǎn)料 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<!-- 讓我們能處理異步請(qǐng)求 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
第二步:創(chuàng)建SSE控制器(服務(wù)器的“播音室”)
package com.example.pushdemo.controller;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
@RestController
@RequestMapping("/sse")
public class SseController {
// 保存所有連接的客戶端,CopyOnWriteArrayList是線程安全的
private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();
/**
* 客戶端連接入口 - 相當(dāng)于打開收音機(jī)
* @return SseEmitter對(duì)象
*/
@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect() {
// 設(shè)置連接超時(shí)時(shí)間(毫秒),0表示永不超時(shí)
SseEmitter emitter = new SseEmitter(0L);
// 連接建立時(shí)的處理
emitter.onCompletion(() -> {
System.out.println("客戶端斷開連接了,有點(diǎn)小失落...");
emitters.remove(emitter);
});
emitter.onTimeout(() -> {
System.out.println("客戶端連接超時(shí)了,是不是網(wǎng)不好?");
emitters.remove(emitter);
});
// 添加到連接列表
emitters.add(emitter);
try {
// 發(fā)送歡迎消息
emitter.send(SseEmitter.event()
.name("welcome") // 事件名稱
.data("連接成功!我是話癆服務(wù)器,我會(huì)主動(dòng)給你推送消息!"));
} catch (IOException e) {
emitter.completeWithError(e);
}
System.out.println("新客戶端加入,當(dāng)前連接數(shù):" + emitters.size());
return emitter;
}
/**
* 向所有客戶端廣播消息 - 服務(wù)器開始廣播啦!
* @param message 要推送的消息
* @return 推送結(jié)果
*/
@PostMapping("/broadcast")
public String broadcast(@RequestParam String message) {
System.out.println("準(zhǔn)備廣播消息:" + message);
int successCount = 0;
// 遍歷所有連接
for (SseEmitter emitter : emitters) {
try {
// 發(fā)送消息
emitter.send(SseEmitter.event()
.name("message") // 事件類型
.data(message + " - " + System.currentTimeMillis()));
successCount++;
System.out.println("消息已推送給客戶端:" + emitter);
} catch (IOException e) {
System.out.println("推送失敗,移除失效連接");
emitters.remove(emitter);
}
}
return String.format("廣播完成!成功推送給 %d/%d 個(gè)客戶端",
successCount, emitters.size());
}
/**
* 發(fā)送系統(tǒng)通知
*/
@PostMapping("/system-notice")
public String sendSystemNotice(@RequestParam String notice) {
for (SseEmitter emitter : emitters) {
try {
emitter.send(SseEmitter.event()
.name("system")
.data("系統(tǒng)通知:" + notice));
} catch (IOException e) {
// 靜默處理錯(cuò)誤連接
}
}
return "系統(tǒng)通知已發(fā)送";
}
}
第三步:創(chuàng)建WebSocket配置(備選方案,雙向通信)
package com.example.pushdemo.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new MyWebSocketHandler(), "/ws")
.setAllowedOrigins("*"); // 生產(chǎn)環(huán)境記得限制域名哦!
}
}
package com.example.pushdemo.config;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
public class MyWebSocketHandler extends TextWebSocketHandler {
private static final CopyOnWriteArraySet<WebSocketSession> sessions =
new CopyOnWriteArraySet<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
sessions.add(session);
System.out.println("WebSocket連接建立,當(dāng)前連接數(shù):" + sessions.size());
try {
session.sendMessage(new TextMessage("?? 歡迎來到WebSocket聊天室!"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected void handleTextMessage(WebSocketSession session,
TextMessage message) throws Exception {
// 廣播收到的消息
String payload = message.getPayload();
System.out.println("收到消息:" + payload);
for (WebSocketSession s : sessions) {
if (s.isOpen()) {
s.sendMessage(new TextMessage("用戶說:" + payload));
}
}
}
@Override
public void afterConnectionClosed(WebSocketSession session,
org.springframework.web.socket.CloseStatus status) {
sessions.remove(session);
System.out.println("WebSocket連接關(guān)閉");
}
}
第四步:創(chuàng)建HTML測(cè)試頁面(給客戶端配個(gè)"收音機(jī)")
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>消息推送測(cè)試 - 服務(wù)器的碎碎念</title>
<style>
body {
font-family: 'Microsoft YaHei', sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
}
.container {
background: white;
border-radius: 15px;
padding: 30px;
box-shadow: 0 10px 30px rgba(0,0,0,0.1);
}
h1 {
color: #2c3e50;
text-align: center;
margin-bottom: 30px;
}
.card {
background: #f8f9fa;
border-radius: 10px;
padding: 20px;
margin: 20px 0;
border-left: 5px solid #3498db;
}
.message-area {
height: 300px;
overflow-y: auto;
border: 1px solid #ddd;
border-radius: 8px;
padding: 15px;
margin: 20px 0;
background: #fff;
}
.message {
padding: 10px;
margin: 10px 0;
border-radius: 8px;
animation: fadeIn 0.5s;
}
.system { background: #e3f2fd; }
.welcome { background: #e8f5e9; }
.user { background: #fff3e0; }
@keyframes fadeIn {
from { opacity: 0; transform: translateY(10px); }
to { opacity: 1; transform: translateY(0); }
}
button {
background: #3498db;
color: white;
border: none;
padding: 12px 24px;
border-radius: 6px;
cursor: pointer;
font-size: 16px;
transition: all 0.3s;
margin: 5px;
}
button:hover {
background: #2980b9;
transform: translateY(-2px);
}
.btn-danger {
background: #e74c3c;
}
.btn-success {
background: #2ecc71;
}
.input-group {
display: flex;
gap: 10px;
margin: 20px 0;
}
input {
flex: 1;
padding: 12px;
border: 2px solid #ddd;
border-radius: 6px;
font-size: 16px;
}
input:focus {
border-color: #3498db;
outline: none;
}
</style>
</head>
<body>
<div class="container">
<h1>服務(wù)器廣播站測(cè)試</h1>
<div class="card">
<h3>連接狀態(tài)</h3>
<p id="status">準(zhǔn)備連接服務(wù)器...</p>
<button onclick="connectSSE()">連接SSE服務(wù)器</button>
<button onclick="connectWebSocket()">連接WebSocket</button>
<button class="btn-danger" onclick="disconnect()">斷開連接</button>
</div>
<div class="card">
<h3>消息測(cè)試</h3>
<div class="input-group">
<input type="text" id="messageInput"
placeholder="輸入要廣播的消息..." />
<button onclick="sendBroadcast()">廣播消息</button>
<button class="btn-success" onclick="sendSystemNotice()">
發(fā)送系統(tǒng)通知
</button>
</div>
</div>
<div class="card">
<h3>收到的消息</h3>
<div id="messageArea" class="message-area"></div>
<button onclick="clearMessages()">清空消息</button>
<span id="counter">消息數(shù)量: 0</span>
</div>
</div>
<script>
let eventSource = null;
let ws = null;
let messageCount = 0;
// 添加消息到顯示區(qū)域
function addMessage(content, type = 'system') {
const area = document.getElementById('messageArea');
const message = document.createElement('div');
message.className = `message ${type}`;
message.innerHTML = `
<strong>[${new Date().toLocaleTimeString()}]</strong>
<span>${content}</span>
`;
area.appendChild(message);
area.scrollTop = area.scrollHeight;
messageCount++;
document.getElementById('counter').textContent =
`消息數(shù)量: ${messageCount}`;
}
// 連接SSE
function connectSSE() {
if (eventSource) {
addMessage('已經(jīng)連接過了,別著急嘛!');
return;
}
eventSource = new EventSource('/sse/connect');
eventSource.onopen = () => {
document.getElementById('status').innerHTML =
'SSE連接成功!服務(wù)器現(xiàn)在可以主動(dòng)推送消息了';
addMessage('SSE連接已建立', 'welcome');
};
// 監(jiān)聽不同類型的消息
eventSource.addEventListener('welcome', (e) => {
addMessage(e.data, 'welcome');
});
eventSource.addEventListener('message', (e) => {
addMessage(`收到廣播: ${e.data}`, 'user');
});
eventSource.addEventListener('system', (e) => {
addMessage(e.data, 'system');
});
eventSource.onerror = (e) => {
document.getElementById('status').innerHTML =
'SSE連接出錯(cuò),嘗試重連中...';
console.error('SSE錯(cuò)誤:', e);
// 3秒后重連
setTimeout(() => {
if (eventSource.readyState === EventSource.CLOSED) {
disconnect();
connectSSE();
}
}, 3000);
};
}
// 連接WebSocket
function connectWebSocket() {
if (ws && ws.readyState === WebSocket.OPEN) {
addMessage('WebSocket已經(jīng)連接了!');
return;
}
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const host = window.location.host;
ws = new WebSocket(`${protocol}//${host}/ws`);
ws.onopen = () => {
document.getElementById('status').innerHTML =
'WebSocket連接成功!可以雙向通信了';
addMessage('WebSocket連接已建立', 'welcome');
};
ws.onmessage = (e) => {
addMessage(`WebSocket消息: ${e.data}`, 'user');
};
ws.onerror = (e) => {
addMessage('WebSocket連接錯(cuò)誤', 'system');
};
ws.onclose = () => {
document.getElementById('status').innerHTML =
'WebSocket連接已關(guān)閉';
};
}
// 發(fā)送廣播消息
async function sendBroadcast() {
const input = document.getElementById('messageInput');
const message = input.value.trim();
if (!message) {
addMessage('請(qǐng)輸入要發(fā)送的消息!', 'system');
return;
}
try {
const response = await fetch('/sse/broadcast', {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
body: `message=${encodeURIComponent(message)}`
});
const result = await response.text();
addMessage(`${result}`, 'system');
input.value = '';
} catch (error) {
addMessage('發(fā)送失敗: ' + error, 'system');
}
}
// 發(fā)送系統(tǒng)通知
async function sendSystemNotice() {
const notice = prompt('請(qǐng)輸入系統(tǒng)通知內(nèi)容:', '服務(wù)器即將維護(hù)');
if (!notice) return;
try {
const response = await fetch(`/sse/system-notice?notice=${encodeURIComponent(notice)}`, {
method: 'POST'
});
const result = await response.text();
addMessage(`${result}`, 'system');
} catch (error) {
addMessage('發(fā)送系統(tǒng)通知失敗', 'system');
}
}
// 斷開連接
function disconnect() {
if (eventSource) {
eventSource.close();
eventSource = null;
addMessage('SSE連接已關(guān)閉', 'system');
}
if (ws) {
ws.close();
ws = null;
addMessage('WebSocket連接已關(guān)閉', 'system');
}
document.getElementById('status').innerHTML =
'連接已斷開';
}
// 清空消息
function clearMessages() {
document.getElementById('messageArea').innerHTML = '';
messageCount = 0;
document.getElementById('counter').textContent = '消息數(shù)量: 0';
addMessage('消息已清空', 'system');
}
// 頁面加載時(shí)的小動(dòng)畫
window.onload = () => {
addMessage('消息推送演示系統(tǒng)已啟動(dòng)', 'welcome');
addMessage('試試點(diǎn)擊"連接SSE服務(wù)器"按鈕開始體驗(yàn)吧!', 'system');
};
// 監(jiān)聽鍵盤事件
document.getElementById('messageInput').addEventListener('keypress', (e) => {
if (e.key === 'Enter') {
sendBroadcast();
}
});
</script>
</body>
</html>
三、進(jìn)階優(yōu)化:讓推送更"聰明"
1. 連接管理器(專業(yè)版)
@Component
public class ConnectionManager {
private final Map<String, SseEmitter> userConnections =
new ConcurrentHashMap<>();
private final Map<String, String> connectionUserMap =
new ConcurrentHashMap<>();
/**
* 添加用戶連接
*/
public SseEmitter addConnection(String userId) {
// 移除舊連接(避免重復(fù)登錄)
if (userConnections.containsKey(userId)) {
userConnections.get(userId).complete();
}
SseEmitter emitter = new SseEmitter(30 * 60 * 1000L); // 30分鐘超時(shí)
emitter.onCompletion(() -> removeConnection(userId));
emitter.onTimeout(() -> removeConnection(userId));
userConnections.put(userId, emitter);
// 發(fā)送連接成功消息
try {
emitter.send(SseEmitter.event()
.name("connected")
.data("用戶 " + userId + " 連接成功!"));
} catch (IOException e) {
removeConnection(userId);
}
return emitter;
}
/**
* 向指定用戶推送消息
*/
public void pushToUser(String userId, String message) {
SseEmitter emitter = userConnections.get(userId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.data(message));
} catch (IOException e) {
removeConnection(userId);
}
}
}
/**
* 向所有用戶廣播
*/
public void broadcast(String message) {
userConnections.forEach((userId, emitter) -> {
try {
emitter.send(SseEmitter.event()
.data(message));
} catch (IOException e) {
removeConnection(userId);
}
});
}
private void removeConnection(String userId) {
userConnections.remove(userId);
System.out.println("用戶 " + userId + " 的連接已移除");
}
}
2. 心跳檢測(cè)(保持連接活躍)
@Component
public class HeartbeatScheduler {
@Autowired
private ConnectionManager connectionManager;
@Scheduled(fixedRate = 25000) // 每25秒發(fā)送一次心跳
public void sendHeartbeat() {
connectionManager.broadcast("心跳檢測(cè) - " + new Date());
}
}
四、不同方案的對(duì)比總結(jié)
| 方案 | 優(yōu)點(diǎn) | 缺點(diǎn) | 適用場(chǎng)景 |
|---|---|---|---|
| SSE | 簡(jiǎn)單易用、自動(dòng)重連、HTTP協(xié)議友好 | 只能服務(wù)器到客戶端單向 | 實(shí)時(shí)通知、新聞推送、股票行情 |
| WebSocket | 雙向通信、實(shí)時(shí)性最好 | 實(shí)現(xiàn)復(fù)雜、需要額外協(xié)議 | 聊天室、在線游戲、協(xié)同編輯 |
| 長輪詢 | 兼容性好、實(shí)現(xiàn)簡(jiǎn)單 | 延遲高、服務(wù)器壓力大 | 兼容性要求高的老系統(tǒng) |
| 短輪詢 | 極其簡(jiǎn)單、無狀態(tài) | 實(shí)時(shí)性差、資源浪費(fèi) | 更新頻率低的應(yīng)用 |
五、總結(jié):讓服務(wù)器"開口說話"的藝術(shù)
通過這次探索,我們給SpringBoot服務(wù)器裝上了"嘴巴",讓它學(xué)會(huì)了主動(dòng)和客戶端聊天!總結(jié)一下關(guān)鍵點(diǎn):
- SSE是你的好朋友 - 對(duì)于服務(wù)器向客戶端的單向推送,SSE簡(jiǎn)單到讓人感動(dòng)
- 連接管理很重要 - 記得及時(shí)清理斷開的連接,不然服務(wù)器內(nèi)存會(huì)"爆炸"
- 錯(cuò)誤處理不能忘 - 網(wǎng)絡(luò)世界充滿了不確定性,要優(yōu)雅地處理各種異常
- 心跳檢測(cè)?;盍?/strong> - 定期發(fā)送心跳,防止連接被防火墻誤殺
- 生產(chǎn)環(huán)境要優(yōu)化 - 記得添加認(rèn)證、限流、集群支持等
推送消息就像談戀愛,不能太頻繁(用戶會(huì)煩),也不能太冷淡(用戶會(huì)跑),要掌握好節(jié)奏!而且千萬別"已讀不回",那比不推送還糟糕!
現(xiàn)在,你的服務(wù)器已經(jīng)從"悶葫蘆"變成了"社交達(dá)人",快去讓它和客戶端愉快地聊天吧!記住:好的推送系統(tǒng),應(yīng)該是用戶感覺不到它的存在,但需要時(shí)它永遠(yuǎn)在那里!
如果你想讓推送更有趣,可以添加表情包識(shí)別、消息優(yōu)先級(jí)、智能推送時(shí)間等功能。畢竟,誰不喜歡一個(gè)會(huì)"察言觀色"的服務(wù)器呢?
以上就是SpringBoot實(shí)現(xiàn)消息推送功能的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot消息推送的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java web數(shù)據(jù)可視化實(shí)現(xiàn)原理解析
這篇文章主要介紹了Java web數(shù)據(jù)可視化實(shí)現(xiàn)原理解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03
在CentOS系統(tǒng)上安裝Java的openjdk的方法
這篇文章主要介紹了在CentOS系統(tǒng)上安裝Java的openjdk的方法,同樣適用于Fedora等其他RedHat系的Linux系統(tǒng),需要的朋友可以參考下2015-06-06
使用java實(shí)現(xiàn)http多線程斷點(diǎn)下載文件(二)
下載工具我想沒有幾個(gè)人不會(huì)用的吧,前段時(shí)間比較無聊,花了點(diǎn)時(shí)間用java寫了個(gè)簡(jiǎn)單的http多線程下載程序,我實(shí)現(xiàn)的這個(gè)http下載工具功能很簡(jiǎn)單,就是一個(gè)多線程以及一個(gè)斷點(diǎn)恢復(fù),當(dāng)然下載是必不可少的,需要的朋友可以參考下2012-12-12

