SpringBoot中使用Redis?Stream實(shí)現(xiàn)消息監(jiān)聽示例
Demo環(huán)境
- JDK8
- Maven3.6.3
- springboot2.4.3
- redis_version:6.2.1
倉庫地址
https://gitee.com/hlovez/redismq.git.
POM依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.3</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>vip.huhailong</groupId> <artifactId>redismq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>redismq</name> <description>base redis stream mq</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
這里是一個(gè)簡單的Demo,所以關(guān)于redis的一些序列化配置就省略了。
配置監(jiān)聽消息類
配置監(jiān)聽消息類,這里類需要實(shí)現(xiàn)StreamListener接口,該接口下只有一個(gè)要實(shí)現(xiàn)的方法——onMessage方法,代碼:
package vip.huhailong.redismq.redistool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* @author Huhailong
* @Description 監(jiān)聽消息
* @Date 2021/3/10.
*/
@Slf4j
@Component
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> entries) {
log.info("接受到來自redis的消息");
System.out.println("message id "+entries.getId());
System.out.println("stream "+entries.getStream());
System.out.println("body "+entries.getValue());
}
}
配置完該類后我們再創(chuàng)建一個(gè)類將該監(jiān)聽器注入進(jìn)去,代碼:
package vip.huhailong.redismq.config;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import vip.huhailong.redismq.redistool.ListenerMessage;
import java.time.Duration;
/**
* @author Huhailong
* @Description
* @Date 2021/3/12.
*/
@Configuration
public class RedisStreamConfig {
@Autowired
private ListenerMessage streamListener;
@Bean
public Subscription subscription(RedisConnectionFactory factory){
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
var listenerContainer = StreamMessageListenerContainer.create(factory,options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
listenerContainer.start();
return subscription;
}
}
代碼分析:
首先將我們實(shí)現(xiàn)了StreamListener的監(jiān)聽器類注入。
subscription方法返回的是一個(gè)Subscription類型,它是與當(dāng)前正在運(yùn)行任務(wù)的鏈接,可以理解為訂閱的鏈接,它有倆個(gè)方法
- boolean await(Duration timeout): 當(dāng)訂閱變?yōu)榛顒?dòng)或超時(shí)時(shí),同步阻塞呼叫將返回
- boolean isActive(): 如果當(dāng)前正在訂閱則為true
代碼中的var是使用了Lombok的可變局部變量。主要是為了方便
StreamMessageListenerContainer: 消息偵聽容器,不能在外部實(shí)現(xiàn)。創(chuàng)建后,StreamMessageListenerContainer可以訂閱Redis流并使用傳入的消息。 StreamMessageListenerContainer允許多個(gè)流讀取請求,并為每個(gè)讀取請求返回一個(gè)Subscription句柄。取消訂閱最終將終止后臺(tái)輪詢。使用鍵和值序列化器轉(zhuǎn)換消息以支持各種序列化策略。具體文檔
__StreamMessageListenerContainerOptions: __ 它是上面4的選項(xiàng),代碼中pollTimeout表示輪詢超時(shí)時(shí)間
create方法使用給定的RedisConnectionFactory和上面的配置選項(xiàng)創(chuàng)建偵聽器。
接下來使用receiveAutoAck創(chuàng)建一個(gè)新訂閱,注意這里接受到消息后會(huì)被自動(dòng)的確認(rèn),如果不想自動(dòng)確認(rèn)請使用其他的創(chuàng)建訂閱方式。該方法共有三個(gè)參數(shù)
- 消費(fèi)組 consumer group ,它不能為null (Consumer類型)
- stream offset ,stream的偏移量(StreamOffset 類型)
- listener 不能為null (StreamListener<K,V> 類型)
代碼中表示消費(fèi)者來自名稱為mygroup的組,消費(fèi)者名稱為huhailong,這里偏移量設(shè)置為了lastConsumed,它表示讀取ID大于消費(fèi)者組使用的最后一個(gè)元素的所有新到達(dá)的元素。
現(xiàn)在就可以運(yùn)行項(xiàng)目來驗(yàn)證了,將項(xiàng)目運(yùn)行起來后通過終端給對應(yīng)key的stream添加一條消息
> XADD mystream * message springboot
這時(shí)可以看到spring boot的控制臺(tái)打印除了一下消息:
message id 1615532778588-0
stream mystream
body {message=springboot}
說明偵聽成功,它會(huì)一直處于監(jiān)聽狀態(tài),只要對應(yīng)key的stream添加了新的消息都會(huì)被偵聽到,到此也就簡單的實(shí)現(xiàn)了消息隊(duì)列功能。
監(jiān)聽倆個(gè)stream的實(shí)現(xiàn)
有網(wǎng)友想實(shí)現(xiàn)倆個(gè)或倆個(gè)以上的stream監(jiān)聽,可以這樣實(shí)現(xiàn)(有其他更好的方法請留言評論,謝謝)
在上面監(jiān)聽一個(gè)stream的基礎(chǔ)上在RedisStreamConfig類里增加監(jiān)聽方法,如下:
@Configuration
public class RedisStreamConfig {
@Autowired
private ListenerMessage streamListener;
@Bean
public Subscription subscription(RedisConnectionFactory factory){
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
initStream("mystream","mygroup"); //詳細(xì)描述請看下方的問題補(bǔ)充——初始化key和group
var listenerContainer = StreamMessageListenerContainer.create(factory,options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscription2(RedisConnectionFactory factory){
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
initStream("mystream","mygroup"); //詳細(xì)描述請看下方的問題補(bǔ)充——初始化key和group
var listenerContainer = StreamMessageListenerContainer.create(factory,options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
StreamOffset.create("mystream2", ReadOffset.lastConsumed()),streamListener);
listenerContainer.start();
return subscription;
}
}
記得創(chuàng)建該stream和對應(yīng)的組,讓后啟動(dòng)程序,在控制臺(tái)模擬添加數(shù)據(jù),如圖:

然后idea控制臺(tái)的打印可以看到分別接收到了來自不同的stream的消息

其他類和配置與監(jiān)聽一個(gè)的時(shí)候相同,不需要改變。
【問題補(bǔ)充】確認(rèn)完消息刪除消息
消息確認(rèn)完后消息實(shí)際上沒有在redis中消失,這也是redis stream中的一個(gè)特性,如果要想真正的刪除該消息,需要我們進(jìn)行指定字段ID刪除,如下:
在RedisUtil中添加刪除方法:
public void delField(String key, String fieldId){
redisTemplate.opsForStream().delete(key,fieldId);
}
然后再ListenerMessage中的onMessage方法里處理完消息后添加刪除該消息的方法:
@Component
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
RedisUtil redisUtil;
@Override
public void onMessage(MapRecord<String, String, String> entries) {
log.info("接受到來自redis的消息");
System.out.println("message id "+entries.getId().getValue());
System.out.println("stream "+entries.getStream());
System.out.println("body "+entries.getValue());
//下面是刪除該消息的方法
redisUtil.delField("mystream",entries.getId().getValue());
}
}
【問題補(bǔ)充】自動(dòng)初始化stream的key和group問題-最新更新-2021年12月4日
最近看到評論大部分有報(bào)找不到key和group的錯(cuò)信息,這個(gè)是因?yàn)闆]有初始化導(dǎo)致的,當(dāng)時(shí)在寫這個(gè)demo的時(shí)候?yàn)榱撕唵螠y試直接寫死了我已經(jīng)存在的key和group,部分朋友不知道這個(gè)是需要初始化的,所以我更新了一下代碼,增加了初始化key和group的方法,這樣大家直接運(yùn)行代碼就可以了,不需要在做多余的操作。
RedisUtil中增加增加的代碼:
public RecordId addStream(String key, Map<String,Object> message){
RecordId add = redisTemplate.opsForStream().add(key, message);
return add; //返回增加后的id
}
public void addGroup(String key, String groupName){
redisTemplate.opsForStream().createGroup(key,groupName);
}
/**
* 用來判斷key是否存在
*/
public boolean hasKey(String key){
if(key==null){
return false;
}else{
return redisTemplate.hasKey(key);
}
}
然后再RedisStreamConfig中增加初始化方法,這里注意要判斷key是否存在:
private void initStream(String key, String group){
//判斷key是否存在,如果不存在則創(chuàng)建
boolean hasKey = redisUtil.hasKey(key);
if(!hasKey){
Map<String,Object> map = new HashMap<>();
map.put("field","value");
RecordId recordId = redisUtil.addStream(key, map);
redisUtil.addGroup(key,group);
//將初始化的值刪除掉
redisUtil.delField(key,recordId.getValue());
log.info("stream:{}-group:{} initialize success",key,group);
}
}
然后再上面配置監(jiān)聽的方法里調(diào)用這個(gè)初始化方法就可以了
@Bean
public Subscription subscription(RedisConnectionFactory factory){
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
initStream("mystream","mygroup"); //調(diào)用初始化
var listenerContainer = StreamMessageListenerContainer.create(factory,options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
listenerContainer.start();
return subscription;
}
https://gitee.com/hlovez/redismq.git已更新,代碼已優(yōu)化整理上傳
到此這篇關(guān)于SpringBoot中使用Redis Stream實(shí)現(xiàn)消息監(jiān)聽示例的文章就介紹到這了,更多相關(guān)SpringBoot 消息監(jiān)聽內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot集成Redis實(shí)現(xiàn)消息隊(duì)列的方法
- SpringBoot+Redis實(shí)現(xiàn)消息的發(fā)布與訂閱的示例代碼
- springboot整合redis之消息隊(duì)列
- SpringBoot整合Redis實(shí)現(xiàn)消息發(fā)布與訂閱的示例代碼
- SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)
- springboot集成redis實(shí)現(xiàn)消息的訂閱與發(fā)布
- SpringBoot集成Redisson實(shí)現(xiàn)消息隊(duì)列的示例代碼
- Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)
相關(guān)文章
MyBatisPlus+Spring實(shí)現(xiàn)聲明式事務(wù)的方法實(shí)現(xiàn)
本文主要介紹了MyBatisPlus+Spring實(shí)現(xiàn)聲明式事務(wù)的方法實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-07-07
springboot2如何集成ElasticSearch6.4.3
這篇文章主要介紹了springboot2如何集成ElasticSearch6.4.3問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07
Spring Boot項(xiàng)目如何同時(shí)支持HTTP和HTTPS協(xié)議的實(shí)現(xiàn)
這篇文章主要介紹了Spring Boot項(xiàng)目如何同時(shí)支持HTTP和HTTPS協(xié)議的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10
springboot如何獲取yaml/yml(或properties)配置文件信息
在SpringBoot項(xiàng)目中,讀取配置文件信息是常見需求,可以通過@Autowired注入Environment類,使用@Value注解直接注入配置信息,或定義工具類結(jié)合ApplicationRunner進(jìn)行高級配置信息獲取,特別提到2024-11-11
IntelliJ IDEA中ajax開發(fā)實(shí)現(xiàn)分頁查詢示例
這篇文章主要介紹了IntelliJ IDEA中ajax開發(fā)實(shí)現(xiàn)分頁查詢,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-03-03
java實(shí)現(xiàn)PPT轉(zhuǎn)PDF出現(xiàn)中文亂碼問題的解決方法
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)PPT轉(zhuǎn)PDF出現(xiàn)中文亂碼問題的解決方法,進(jìn)行了詳細(xì)的問題分析,需要的朋友可以參考下2015-11-11
Java數(shù)據(jù)存儲(chǔ)的“雙子星”對決(Map和Set的區(qū)別)
文章主要介紹了Java中Map和Set兩種數(shù)據(jù)結(jié)構(gòu)的定義、實(shí)現(xiàn)、方法及應(yīng)用場景,Map用于存儲(chǔ)鍵值對,鍵唯一,值可重復(fù);Set用于存儲(chǔ)唯一元素,無序,兩者都提供了豐富的操作方法,如添加、刪除、查找等,感興趣的朋友一起看看吧2025-02-02
Spring?WebFlux怎么進(jìn)行異常處理源碼解析
這篇文章主要為大家介紹了Spring?WebFlux怎么進(jìn)行異常處理源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08

