spring配置websocket的完整流程
在 Spring Boot 中使用 Kotlin 配置 WebSocket 的完整流程如下(包含基礎配置、安全增強和性能優(yōu)化):
一、添加依賴 (build.gradle.kts 或 pom.xml)
// build.gradle.kts
dependencies {
implementation("org.springframework.boot:spring-boot-starter-websocket")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin") // JSON 支持
}
二、基礎 WebSocket 配置
1. 啟用 WebSocket 支持
@Configuration
@EnableWebSocket
class WebSocketConfig : WebSocketConfigurer {
@Autowired
lateinit var myWebSocketHandler: MyWebSocketHandler
@Autowired
lateinit var handshakeInterceptor: AuthHandshakeInterceptor
override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) {
registry.addHandler(myWebSocketHandler, "/ws")
.addInterceptors(handshakeInterceptor)
.setAllowedOrigins("*") // 生產(chǎn)環(huán)境應限制域名
}
}
三、核心組件實現(xiàn)
1. WebSocket 消息處理器
@Component
class MyWebSocketHandler : TextWebSocketHandler() {
private val sessions = ConcurrentHashMap<String, WebSocketSession>()
private val logger = LoggerFactory.getLogger(this::class.java)
// 連接建立
override fun afterConnectionEstablished(session: WebSocketSession) {
val userId = session.attributes["userId"] as? String
userId?.let {
sessions[it] = session
logger.info("User $it connected, total: ${sessions.size}")
} ?: run {
session.close(CloseStatus.BAD_DATA)
}
}
// 處理文本消息
override fun handleTextMessage(session: WebSocketSession, message: TextMessage) {
val payload = message.payload
logger.debug("Received message: $payload")
// 示例:廣播消息
sessions.values.forEach {
it.sendMessage(TextMessage("Broadcast: $payload"))
}
}
// 連接關(guān)閉
override fun afterConnectionClosed(session: WebSocketSession, status: CloseStatus) {
sessions.values.remove(session)
logger.info("Connection closed: ${status.reason}")
}
// 主動推送方法
fun sendToUser(userId: String, message: String) {
sessions[userId]?.sendMessage(TextMessage(message))
}
}
2. 握手攔截器(安全校驗)
@Component
class AuthHandshakeInterceptor : HandshakeInterceptor {
override fun beforeHandshake(
request: ServerHttpRequest,
response: ServerHttpResponse,
wsHandler: WebSocketHandler,
attributes: MutableMap<String, Any>
): Boolean {
// 1. 校驗 Token
val token = request.uri.query?.split("token=")?.get(1)
if (!validateToken(token)) {
response.statusCode = HttpStatus.UNAUTHORIZED
return false
}
// 2. 獲取用戶信息
val userId = parseUserIdFromToken(token)
attributes["userId"] = userId
// 3. 記錄握手日志
logHandshake(request.remoteAddress?.hostName)
return true
}
private fun validateToken(token: String?): Boolean {
// 實現(xiàn) JWT 校驗邏輯
return token?.startsWith("valid_") == true
}
}
四、客戶端連接示例(JavaScript + Kotlin)
1. Web 前端連接
const socket = new WebSocket(`ws://${location.host}/ws?token=valid_123`);
socket.onmessage = (event) => {
console.log('Received:', event.data);
};
// 發(fā)送心跳包保持連接
setInterval(() => {
socket.send(JSON.stringify({ type: "heartbeat" }));
}, 30000);
2. Android 客戶端(Kotlin)
class WebSocketClient(url: String) : WebSocketListener() {
private val client = OkHttpClient()
private var ws: WebSocket? = null
init {
val request = Request.Builder().url(url).build()
ws = client.newWebSocket(request, this)
}
override fun onMessage(webSocket: WebSocket, text: String) {
println("Received: $text")
}
fun sendMessage(msg: String) {
ws?.send(msg)
}
}
五、高級配置
1. 消息壓縮配置
@Bean
fun webSocketContainer(): ServletServerContainerFactoryBean {
return ServletServerContainerFactoryBean().apply {
setMaxTextMessageBufferSize(8192)
setMaxBinaryMessageBufferSize(8192)
setAsyncSendTimeout(30000L) // 30秒異步發(fā)送超時
}
}
2. 集群支持(Redis 廣播)
@Configuration
@EnableRedisRepositories
class RedisPubSubConfig {
@Bean
fun redisTemplate(connectionFactory: RedisConnectionFactory): RedisTemplate<String, String> {
return RedisTemplate<String, String>().apply {
setConnectionFactory(connectionFactory)
}
}
@Bean
fun topicListenerAdapter(handler: MessageListener): ChannelTopic {
return ChannelTopic("websocket-messages")
}
}
六、監(jiān)控與調(diào)試
1. 端點監(jiān)控
@RestController
class WebSocketMetricsController(
private val handler: MyWebSocketHandler
) {
@GetMapping("/metrics/websocket")
fun getMetrics(): Map<String, Any> {
return mapOf(
"activeConnections" to handler.getSessionCount(),
"lastMessageTime" to handler.getLastActivity()
)
}
}
2. 日志配置 (logback-spring.xml)
<logger name="org.springframework.web.socket" level="DEBUG"/> <logger name="com.example.websocket" level="TRACE"/>
七、常見問題解決方案
| 問題現(xiàn)象 | 解決方案 |
|---|---|
| 連接頻繁斷開 | 添加心跳機制,調(diào)整 setAsyncSendTimeout |
| 跨域失敗 | 精確配置 .setAllowedOrigins("https://your-domain.com") |
| 消息順序錯亂 | 使用 @SendToUser(destination = "/queue", broadcast = false) 點對點發(fā)送 |
| 內(nèi)存泄漏 | 定期檢查 sessions Map,添加連接超時清理邏輯 |
| 高并發(fā)時性能下降 | 啟用異步消息處理,使用 Redis Pub/Sub 分流消息 |
八、安全增強建議
啟用 WSS 協(xié)議:
# Nginx 配置示例 location /ws { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header X-Real-IP $remote_addr; proxy_ssl_protocols TLSv1.2 TLSv1.3; }限流防護:
@Bean fun webSocketRateLimiter(): WebSocketHandlerDecoratorFactory { return object : WebSocketHandlerDecoratorFactory { override fun decorate(handler: WebSocketHandler): WebSocketHandler { val rateLimiter = RateLimiter.create(100) // 100次/秒 return object : WebSocketHandlerDecorator(handler) { override fun handleMessage(session: WebSocketSession, message: WebSocketMessage<*>) { if (!rateLimiter.tryAcquire()) { session.close(CloseStatus.POLICY_VIOLATION) return } super.handleMessage(session, message) } } } } }
九、性能測試建議
使用 JMeter 壓測:
<!-- WebSocket 壓測計劃示例 --> <WebSocketSampler> <connectTime>5000</connectTime> <responseTimeout>10000</responseTimeout> <payload>{ "type": "stress", "data": "test" }</payload> </WebSocketSampler>監(jiān)控指標:
• 單節(jié)點最大連接數(shù)
• 消息往返延遲 (RTT)
• 內(nèi)存占用增長率
通過以上配置,可以實現(xiàn)一個高性能、安全可靠的企業(yè)級 WebSocket 服務,支持從開發(fā)到生產(chǎn)的全生命周期管理。
總結(jié)
到此這篇關(guān)于spring配置websocket的文章就介紹到這了,更多相關(guān)spring配置websocket內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java深入了解數(shù)據(jù)結(jié)構(gòu)之棧與隊列的詳解
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)中的棧與隊列,在Java的時候,對于棧與隊列的應用需要熟練的掌握,這樣才能夠確保Java學習時候能夠有扎實的基礎能力。本文小編就來詳細說說Java中的棧與隊列,需要的朋友可以參考一下2022-01-01

