詳解Spring Cloud微服務(wù)架構(gòu)下的WebSocket解決方案
WebSocket在現(xiàn)代瀏覽器中的應(yīng)用已經(jīng)算是比較普遍了,在某些業(yè)務(wù)場(chǎng)景下,要求必須能夠在服務(wù)器端推送消息至客戶端。在沒(méi)有WebSocket的年代,我們使用過(guò)dwr,在那個(gè)時(shí)候dwr真實(shí)一個(gè)非常棒的方案。但是在WebSocket興起之后,我們更愿意使用標(biāo)準(zhǔn)實(shí)現(xiàn)來(lái)解決問(wèn)題、
首先交代一下,本篇文章不講解WebSocket的配置,主要講的是針對(duì)在微服務(wù)架構(gòu)集群模式下解決方案的選擇。
微服務(wù)架構(gòu)大家應(yīng)該都不陌生了,在微服務(wù)架構(gòu)下,服務(wù)是分布式的,而且為了保證業(yè)務(wù)的可用性,每個(gè)服務(wù)都是以集群的形式存在。在集群模式下,要保證集群的每一個(gè)節(jié)點(diǎn)的訪問(wèn)得到相同的結(jié)果就需要做到數(shù)據(jù)一致性,如緩存、session等。
微服務(wù)集群緩存通常使用分布式緩存redis解決,session一致性也通常會(huì)通過(guò)redis解決,但是現(xiàn)在更流行的是無(wú)狀態(tài)的Http,即無(wú)session化,最常見(jiàn)的解決方案就是OAuth。
WebSocket有所不同,它是與服務(wù)端建立一個(gè)長(zhǎng)連接,在集群模式下,顯然不可能把前端與服務(wù)集群中的每一個(gè)節(jié)點(diǎn)建立連接,一個(gè)可行的思路是像解決http session的共享一樣,通過(guò)redis來(lái)實(shí)現(xiàn)websocket的session共享,但是websocket session的數(shù)量是遠(yuǎn)多于http session的數(shù)量的(因?yàn)槊看蜷_(kāi)一個(gè)頁(yè)面都會(huì)建立一個(gè)websocket連接),所以隨著用戶量的增長(zhǎng),共享的數(shù)據(jù)量太大,很容易造成瓶頸。
另一個(gè)思路是,websocket總歸會(huì)與集群中某個(gè)節(jié)點(diǎn)建立連接,那么,只要找到連接所在的節(jié)點(diǎn),就可以向服務(wù)端推送消息了,那么要解決的問(wèn)題就是如何找到一個(gè)websocket連接所在的節(jié)點(diǎn)。要找到連接在哪個(gè)節(jié)點(diǎn)上,我們需要一個(gè)唯一的標(biāo)識(shí)符用于尋找連接,然而在基于stomp的發(fā)布-訂閱模式下,一個(gè)消息的推送可能是面向若干個(gè)連接的,可能分布在集群中的每一個(gè)節(jié)點(diǎn)上,這樣去尋找連接的代價(jià)也很高。既然這樣,我們不妨換種思路,每一個(gè)websocket消息,我們?cè)诩旱拿總€(gè)節(jié)點(diǎn)上都進(jìn)行推送,訂閱了該消息的連接,不管有一個(gè)還是一萬(wàn)個(gè),最終肯定都能收到這個(gè)消息?;谶@個(gè)思路,我們做了一些技術(shù)選型:
- RabbitMQ
- Spring Cloud Stream
首先說(shuō)RabbitMQ,高級(jí)消息隊(duì)列,可以實(shí)現(xiàn)消息廣播(當(dāng)然kafka一樣可以做到,這里只介紹一種),另一項(xiàng)技術(shù)是Spring Cloud Stream,stream是一個(gè)用于構(gòu)建高度可擴(kuò)展事件驅(qū)動(dòng)型微服務(wù)的框架,并且它可以跟RabbitMQ、Kafka以及其他多種消息服務(wù)集成,使用了stream,要把rabbitmq換成kafka只不過(guò)是改改配置的事情。接下來(lái)重點(diǎn)介紹使用方法:
引入依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>
配置Binder
binder是stream中的重要概念,是用于配置用于stream發(fā)布和訂閱事件的消息中間件。先看一段配置:
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
username: username
password: password
virtual-host: /
配置中的 defaultRabbit 是binder的名稱,一會(huì)會(huì)在其他配置中引用,type指定了消息中間件的類型,environment是對(duì)消息中間件的配置,這里的配置結(jié)構(gòu)和spring.rabbitmq命名空間下的配置項(xiàng)一模一樣的,可以參照著進(jìn)行配置(這樣配置的作用是可以把stream的rabbitmq配置和項(xiàng)目中其他地方使用的rabbitmq區(qū)分開(kāi),如果這里不配置environment,binder會(huì)沿用spring.rabbitmq命名空間下的配置),比如你的項(xiàng)目中的rabbitmq的配置是這樣的:
spring: rabbitmq: host: localhost username: username password: password virtual-host: /
那上門的binder的environment配置完全可以去掉。
消息流與binder的綁定
微服務(wù)要接收揮著發(fā)布事件消息,根據(jù)spring cloud stream的名字,顧名思義,需要使用流,所以需要在配置中聲明兩個(gè)事件流,一個(gè)輸入流,一個(gè)輸出流:
spring:
cloud:
stream:
bindings:
websocketMessageIn:
destination: websocketMessage
binder: defaultRabbit
websocketMessageOut:
destination: websocketMessage
binder: defaultRabbit
這里我們看到,事件流引用了binder,表示這兩個(gè)流使用rabbitmq這個(gè)中間件(看到這里想必大家已經(jīng)明白了,在一個(gè)項(xiàng)目中完全可以同時(shí)使用rabbit和kafka作為事件流的消息中間件)。
websocketMessageIn,websocketMessageOut是事件流的名字(可以自己隨便起),destination指定了兩個(gè)事件流的destination是同一個(gè),這決定了寫入和讀取是指向同一個(gè)地方(不一定是同一個(gè)消息隊(duì)列)。
事件流聲明
事件流使用接口進(jìn)行定義:
/**
* websocket消息事件流接口
* Created by 吳昊 on 18-11-8.
*
* @author 吳昊
* @since 1.4.3
*/
interface WebSocketMessageStream {
companion object {
const val INPUT: String = "webSocketMessageIn"
const val OUTPUT: String = "webSocketMessageOut"
}
/**
* 輸入
*/
@Input(INPUT)
fun input(): SubscribableChannel
/**
* 輸出
*/
@Output(OUTPUT)
fun output(): MessageChannel
}
聲明事件流接口,這里面定義了兩個(gè)常量,分別對(duì)應(yīng)配置中的兩個(gè)流名稱,通過(guò)調(diào)用input()方法獲取輸入流,通過(guò)調(diào)用output()獲取輸出流。
該接口的實(shí)現(xiàn)由spring cloud stream完成,不需要自己實(shí)現(xiàn)。
使用事件流
聲明一個(gè)bean:
@Component
@EnableBinding(WebSocketMessageStream::class)
class WebSocketMessageService {
……
這里的@EnableBinding 注解指明了事件流接口類,只有添加了這個(gè)注解(要能被Spring識(shí)別到,可以加在入口類上,也可以加在@Configuration注解的類上),該接口才會(huì)被實(shí)現(xiàn),并且加入到Spring的容器中(可以注入)。
上面WebSocketMessageService的內(nèi)容如下:
@Autowired
private lateinit var stream: WebSocketMessageStream
@Autowired
private lateinit var template: SimpMessagingTemplate
@StreamListener(WebSocketMessageStream.INPUT)
fun messageReceived(message: WebSocketMessage) {
template.convertAndSend(message.destination, message.body)
}
fun send(destination: String, body: Any) {
stream.output().send(
MutableMessage(WebSocketMessage(destination, body))
)
}
接收消息
@StreamListener 注解指明了要監(jiān)聽(tīng)的事件流,方法接收的參數(shù)即事件的消息內(nèi)容(使用jackson反序列化),這里的messageReceived方法直接將接收到的消息直接用websocket發(fā)送給前端
發(fā)送消息
同樣,發(fā)送也很簡(jiǎn)單,將消息直接發(fā)送到輸入流中,上面的send方法即是將原本應(yīng)該用SimpMessagingTemplate發(fā)送給websocket的消息發(fā)送到spring cloud stream的事件流中。這樣做以后,項(xiàng)目中所有需要向前端推送webSocket消息的操作都應(yīng)該調(diào)用send方法來(lái)進(jìn)行。
講到這里大家可能還有點(diǎn)糊涂,也有一些疑問(wèn),為什么這樣每個(gè)微服務(wù)節(jié)點(diǎn)就能收到事件消息了?或者單個(gè)節(jié)點(diǎn)接收事件消息和多個(gè)節(jié)點(diǎn)接收的配置是怎么控制的。各位不要著急,待我慢慢道來(lái),接下來(lái)就要結(jié)合rabbit的知識(shí)來(lái)講解 了:
首先看一下rabbit的消息隊(duì)列:

從圖中看到,存在多個(gè)以webSocketMessage開(kāi)頭的隊(duì)列,這是每一個(gè)微服務(wù)節(jié)點(diǎn)創(chuàng)建了一個(gè)消息隊(duì)列,再來(lái)看exchange:

exchange綁定的消息隊(duì)列

這里的exchange名稱和上面消息隊(duì)列的名稱前綴均是webSocketMessage, 這個(gè)都是 由前面的binding配置中的destination指定的,和destination名稱保持一致
當(dāng)應(yīng)用向輸入流中寫入事件時(shí),使用destination作為key(即webSocketMessage),將消息寫入名為webSocketMessage的exchange,由于exchange綁定的消息隊(duì)列前綴均為webSocketMessage且routing key都是#,所以exchange會(huì)將消息路由到每一個(gè)webSocketMessage開(kāi)頭的消息隊(duì)列上(這里涉及到rabbitmq的知識(shí)點(diǎn),如過(guò)不懂請(qǐng)自行查閱資料),這樣每一個(gè)微服務(wù)都能接收到相同的消息。
我們?cè)賮?lái)看前面提出的問(wèn)題,這樣的配置可以把消息推送到每一個(gè)微服務(wù)節(jié)點(diǎn),那么如果需要一個(gè)消息只被一個(gè)節(jié)點(diǎn)接收,該怎么配置呢?很簡(jiǎn)單,一個(gè)配置項(xiàng)就可以搞定:
spring:
cloud:
stream:
bindings:
websocketMessageIn:
group: test
destination: websocketMessage
binder: defaultRabbit
可以看到,相比前面的配置,僅僅多了一個(gè)group的配置,這樣配置之后,rabbitmq會(huì)生成一個(gè)名為websocketMessage.test的消息隊(duì)列(前面講到的每個(gè)微服務(wù)建立的消息隊(duì)列是自動(dòng)刪除的,即微服務(wù)斷開(kāi)連接后消息隊(duì)列就被刪除,而這個(gè)消息隊(duì)列是持久化的,也就是即使所有的微服務(wù)節(jié)點(diǎn)全部斷開(kāi)連接也不會(huì)被刪除),所有的微服務(wù)節(jié)點(diǎn)監(jiān)聽(tīng)這一個(gè)隊(duì)列,當(dāng)隊(duì)列中有消息時(shí),只會(huì)被一個(gè)節(jié)點(diǎn)消費(fèi)。
要講的內(nèi)容到此結(jié)束,spring cloud stream的配置遠(yuǎn)不止這些,但是這些配置已足夠完成我所需要做的事情,其他的配置請(qǐng)參考spring cloud stream官方文檔:
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
IDEA-Maven項(xiàng)目的jdk版本設(shè)置方法
我們需要設(shè)置jdk的版本,不然會(huì)提示導(dǎo)致語(yǔ)法錯(cuò)誤,這篇文章主要介紹了IDEA-Maven項(xiàng)目的jdk版本設(shè)置方法,小編覺(jué)得不錯(cuò),一起來(lái)了解一下2019-04-04
Java中的三種校驗(yàn)注解的使用(@Valid,@Validated和@PathVariable)
本文主要介紹了Java中的三種校驗(yàn)注解的使用(@Valid,@Validated和@PathVariable),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04
IDEA 2021配置JavaWeb項(xiàng)目超詳細(xì)教程
本文通過(guò)圖文并茂的形式給大家介紹IDEA 2021配置JavaWeb項(xiàng)目的過(guò)程,內(nèi)容簡(jiǎn)單易懂,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2021-08-08
java創(chuàng)建子類對(duì)象設(shè)置并調(diào)用父類的變量操作
這篇文章主要介紹了java創(chuàng)建子類對(duì)象設(shè)置并調(diào)用父類的變量操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-01-01
SpringBoot項(xiàng)目整合MybatisPlus并使用SQLite作為數(shù)據(jù)庫(kù)的過(guò)程
SQLite是一個(gè)緊湊的庫(kù),啟用所有功能后,庫(kù)大小可以小于 750KiB, 具體取決于目標(biāo)平臺(tái)和編譯器優(yōu)化設(shè)置, 內(nèi)存使用量和速度之間需要權(quán)衡,這篇文章主要介紹了SpringBoot項(xiàng)目整合MybatisPlus并使用SQLite作為數(shù)據(jù)庫(kù),需要的朋友可以參考下2024-07-07
SpringData @Query和@Modifying注解原理解析
這篇文章主要介紹了SpringData @Query和@Modifying注解原理解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08

