spring中websocket定時任務(wù)實現(xiàn)實時推送
有時候業(yè)務(wù)要求websocket連接后,服務(wù)端實時每隔一段時間就將數(shù)據(jù)推送給客戶端進行響應(yīng),這時就需要websocket+定時任務(wù)一起來實現(xiàn)實時推送數(shù)據(jù)給客戶端了。
使用的定時任務(wù)方式為spring的TaskScheduler對象實現(xiàn)任務(wù)調(diào)度。
TaskScheduler定時任務(wù)實現(xiàn)
TaskScheduler接口提供了多種調(diào)度方法來實現(xiàn)運行任務(wù)的執(zhí)行。
public interface TaskScheduler {
//通過觸發(fā)器來決定task是否執(zhí)行
ScheduledFuture schedule(Runnable task, Trigger trigger);
//在starttime的時候執(zhí)行一次
ScheduledFuture schedule(Runnable task, Date startTime);
ScheduledFuture schedule(Runnable task, Instant startTime);
//從starttime開始每個period時間段執(zhí)行一次task
ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period);
ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);
//每隔period執(zhí)行一次
ScheduledFuture scheduleAtFixedRate(Runnable task, long period);
ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);
//從startTime開始每隔delay長時間執(zhí)行一次
ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
//每隔delay時間執(zhí)行一次
ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
}
簡單測試一下
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
/**
* The type Task scheduler test.
*
* @author yjj
* @version 1.0
* @since 2022 -12-28 15:45:17
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TaskSchedulerTest {
private final TaskScheduler taskScheduler;
@Bean
public void test() {
//每隔3秒執(zhí)行一次
Trigger trigger = new CronTrigger("0/3 * * * * *");
//每隔1秒執(zhí)行一次
//Trigger trigger1 = new PeriodicTrigger(1, TimeUnit.SECONDS);
taskScheduler.schedule(new MyThread(), trigger);
}
private class MyThread implements Runnable {
@Override
public void run() {
log.info("定時執(zhí)行線程名稱=【{}】,執(zhí)行時間=【{}】", Thread.currentThread().getName(), DateUtil.date());
}
}
}
效果就是每個3秒執(zhí)行一次

websocket+定時任務(wù)實時推送
實現(xiàn)的業(yè)務(wù)需求如下:客戶端連上來以后就每隔3秒向客戶端實時推送消息。有關(guān)websocket的實現(xiàn)見文章websocket簡單實現(xiàn)
TestWebsocket.java
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
/**
* 測試websocket
*
* @author yjj
* @version 1.0
* @since 2022 -12-28 14:55:29
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TestWebsocket implements WebSocketHandler {
protected static final CopyOnWriteArrayList<WebSocketSession> WEB_SOCKET_SESSIONS = new CopyOnWriteArrayList<>();
/**
* 定時任務(wù)集合
*/
Map<String, ScheduledFuture<?>> stringScheduledFutureMap = new ConcurrentHashMap<>();
/**
* taskScheduler
*/
private final TaskScheduler taskScheduler;
/**
* 建立連接后操作
*
* @param session 連接session信息
* @throws Exception exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sendMessage("連接成功~~~~~~,sessionId=" + session.getId());
WEB_SOCKET_SESSIONS.add(session);
//設(shè)置定時任務(wù),每隔3s執(zhí)行一次
Trigger trigger = new CronTrigger("0/3 * * * * *");
//開啟一個定時任務(wù)
ScheduledFuture<?> schedule = taskScheduler.schedule(new CustomizeTask(session.getId()), trigger);
//根據(jù)session連接id定時任務(wù)線程存到map中
stringScheduledFutureMap.put(session.getId(), schedule);
}
private class CustomizeTask implements Runnable {
private final String sessionId;
CustomizeTask(String sessionId) {
this.sessionId = sessionId;
}
@Override
public void run() {
try {
String message = CharSequenceUtil.format("定時執(zhí)行線程名稱=【{}】,執(zhí)行時間=【{}】", Thread.currentThread().getName(), DateUtil.date());
sendMessage(JSONUtil.toJsonStr(message), sessionId);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 接收到消息后的處理
*
* @param session 連接session信息
* @param message 信息
* @throws Exception exception
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
sendMessage("接收到的消息為=【" + message + "】,sessionId=【" + session.getId() + "】,回復(fù)消息=【你好呀!】");
}
/**
* ws連接出錯時調(diào)用
*
* @param session session連接信息
* @param exception exception
* @throws Exception exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
if (session.isOpen()) {
sendMessage("ws連接出錯,即將關(guān)閉此session,sessionId=【" + session.getId() + "】");
session.close();
}
WEB_SOCKET_SESSIONS.remove(session);
}
/**
* 連接關(guān)閉后調(diào)用
*
* @param session session連接信息
* @param closeStatus 關(guān)閉狀態(tài)
* @throws Exception exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
if (session.isOpen()) {
sendMessage("ws連接即將關(guān)閉此session,sessionId=【" + session.getId() + "】");
session.close();
}
WEB_SOCKET_SESSIONS.remove(session);
String sessionId = session.getId();
ScheduledFuture<?> scheduledFuture = MapUtil.get(stringScheduledFutureMap, sessionId, ScheduledFuture.class);
if (scheduledFuture != null) {
//暫停對應(yīng)session的開啟的定時任務(wù)
scheduledFuture.cancel(true);
//集合移除
stringScheduledFutureMap.remove(sessionId);
}
}
/**
* 是否支持分片消息
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 群發(fā)發(fā)送消息
*
* @param message 消息
* @throws IOException ioException
*/
public void sendMessage(String message) throws IOException {
if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
webSocketSession.sendMessage(new TextMessage(message));
}
}
}
/**
* 發(fā)給指定連接消息
*
* @param message 消息
* @throws IOException ioException
*/
public void sendMessage(String message, String sessionId) throws IOException {
if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
if (sessionId.equals(webSocketSession.getId())) {
webSocketSession.sendMessage(new TextMessage(message));
}
}
}
}
}
websocket綁定URL
import com.yjj.test.websocket.TestWebsocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import javax.annotation.Resource;
/**
* websocket配置
*
* @author yjj
* @version 1.0
* @since 2022 -12-28 15:10:11
*/
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Resource
private TestWebsocket testWebsocket;
/**
* Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired.
*
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(testWebsocket, "/test").setAllowedOrigins("*");
}
}
websocket與定時任務(wù)同時存在時,需要加入配置定義線程池進行線程的管理
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* 當(dāng)定時任務(wù)和websocket同時存在時報錯解決
*
* @author yjj
* @version 1.0
* @since 2022 -04-28 17:35:54
*/
@Configuration
public class ScheduledConfig {
/**
* Schedule本身是單線程執(zhí)行的
*
* @return the task scheduler
*/
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler();
scheduling.setPoolSize(20);
return scheduling;
}
}
效果如下
連接上以后服務(wù)每隔3秒會向客戶端實時推送消息

到此這篇關(guān)于spring中websocket定時任務(wù)實現(xiàn)實時推送的文章就介紹到這了,更多相關(guān)spring websocket實時推送內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Spring?Boot+Vue實現(xiàn)Socket通知推送的完整步驟
- Springboot集成SSE實現(xiàn)單工通信消息推送流程詳解
- SpringBoot整合WebSocket實現(xiàn)后端向前端主動推送消息方式
- Spring?Boot?使用?SSE?方式向前端推送數(shù)據(jù)詳解
- SpringBoot+WebSocket實現(xiàn)消息推送功能
- Springboot整合企業(yè)微信機器人助手推送消息的實現(xiàn)
- SpringBoot整合WxJava開啟消息推送的實現(xiàn)
- SpringBoot2.0集成WebSocket實現(xiàn)后臺向前端推送信息
- SpringBoot+WebSocket+Netty實現(xiàn)消息推送的示例代碼
- Spring SseEmitter推送消息及常用方法
相關(guān)文章
Java+Windows+ffmpeg實現(xiàn)視頻轉(zhuǎn)換功能
這篇文章主要為大家詳細(xì)介紹了Java+Windows+ffmpeg實現(xiàn)視頻轉(zhuǎn)換功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-12-12
詳述IntelliJ IDEA提交代碼前的 Code Analysis 機制(小結(jié))
本篇文章主要介紹了詳述IntelliJ IDEA提交代碼前的 Code Analysis 機制(小結(jié)),具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-11-11
SpringBoot自動配置Quartz的實現(xiàn)步驟
本文主要介紹了SpringBoot自動配置Quartz的實現(xiàn)步驟,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-11-11
解決Springboot配置excludePathPatterns不生效的問題
這篇文章主要介紹了解決Springboot配置excludePathPatterns不生效的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-10-10
學(xué)習(xí)Java的static與final關(guān)鍵字
本篇文章給大家詳細(xì)分析了Java的static與final關(guān)鍵字知識點以及相關(guān)代碼分享,有需要的讀者跟著學(xué)習(xí)下吧。2018-03-03

