SpringBoot使用Redis Stream實(shí)現(xiàn)輕量消息隊(duì)列的示例代碼
引言
Redis Stream 是 Redis 5.0 引入的一種數(shù)據(jù)結(jié)構(gòu),用于處理日志類型的數(shù)據(jù)。它提供了高效、可靠的方式來處理和存儲時間序列數(shù)據(jù),如事件、消息等。其設(shè)計(jì)靈感源于 Kafka 和類似的消息隊(duì)列系統(tǒng),且完全集成在 Redis 中,利用了 Redis 的高性能和持久化特性。
依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
說明:此部分定義了 Redis 相關(guān)的依賴,確保項(xiàng)目能夠引入并使用 Spring Boot 提供的 Redis 啟動器。
RedisTemplate 配置
package com.mjg.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
// 注冊 Java 8 日期時間模塊
om.registerModule(new JavaTimeModule());
om.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
jackson2JsonRedisSerializer.serialize(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key 采用 String 的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash 的 key 也采用 String 的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value 序列化方式采用 jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash 的 value 序列化方式采用 jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
說明:此配置類用于設(shè)置 RedisTemplate 的序列化方式,以滿足不同數(shù)據(jù)類型的存儲和讀取需求。
RedisStreamConfig
package com.mjg.config;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.StrUtil;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Properties;
@Slf4j
@RequiredArgsConstructor
@Configuration
public class RedisStreamConfig implements InitializingBean, DisposableBean {
private final RedisTemplate<String, Object> redisTemplate;
public static String streamName = "user-event-stream";
public static String userEventGroup = "user-event-group";
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
/**
* 消息偵聽器容器,用于監(jiān)聽 Redis Stream 中的消息
*
* @param connectionFactory Redis 連接工廠,用于創(chuàng)建 Redis 連接
* @param messageConsumer 消息消費(fèi)者,用于處理接收到的消息
* @return 返回 {@link StreamMessageListenerContainer}<{@link String}, {@link ObjectRecord}<{@link String}, {@link String}>> 類型的消息偵聽器容器
*/
@Bean
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> messageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {
StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = streamContainer(streamName, connectionFactory, messageConsumer);
listenerContainer.start();
return listenerContainer;
}
/**
* 創(chuàng)建一個流容器,用于監(jiān)聽 Redis Stream 中的數(shù)據(jù)
*
* @param streamName Redis Stream 的名稱
* @param connectionFactory Redis 連接工廠
* @param streamListener 綁定的監(jiān)聽類
* @return 返回 StreamMessageListenerContainer 對象
*/
@SneakyThrows
private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(String streamName, RedisConnectionFactory connectionFactory, StreamListener<String, ObjectRecord<String, String>> streamListener) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(5)) // 拉取消息超時時間
.batchSize(10) // 批量抓取消息
.targetType(String.class) // 傳遞的數(shù)據(jù)類型
.executor(threadPoolTaskExecutor)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
.create(connectionFactory, options);
// 指定消費(fèi)最新的消息
StreamOffset<String> offset = StreamOffset.create(streamName, ReadOffset.lastConsumed());
// 創(chuàng)建消費(fèi)者
StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = buildStreamReadRequest(offset, streamListener);
// 指定消費(fèi)者對象
container.register(streamReadRequest, streamListener);
return container;
}
/**
* 生成流讀取請求
*
* @param offset 偏移量,用于指定從 Redis Stream 中的哪個位置開始讀取消息
* @param streamListener 流偵聽器,用于處理接收到的消息
* @return 返回一個 StreamReadRequest 對象,表示一個流讀取請求
* @throws Exception 當(dāng) streamListener 無法識別為 MessageConsumer 類型時,拋出異常
*/
private StreamMessageListenerContainer.StreamReadRequest<String> buildStreamReadRequest(StreamOffset<String> offset, StreamListener<String, ObjectRecord<String, String>> streamListener) throws Exception {
Consumer consumer;
if (streamListener instanceof MessageConsumer) {
consumer = Consumer.from(userEventGroup, InetAddress.getLocalHost().getHostName());
} else {
throw new Exception("無法識別的 stream key");
}
// 關(guān)閉自動 ack 確認(rèn)
return StreamMessageListenerContainer.StreamReadRequest.builder(offset)
.errorHandler((error) -> {
log.error(error.getMessage());
})
.cancelOnError(e -> false)
.consumer(consumer)
// 關(guān)閉自動 ack 確認(rèn)
.autoAcknowledge(false)
.build();
}
/**
* 檢查 Redis 版本是否符合要求
*
* @throws IllegalStateException 如果 Redis 版本小于 5.0.0 版本,拋出該異常
*/
private void checkRedisVersion() {
// 獲得 Redis 版本
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
Assert.notNull(info, "Redis info is null");
Object redisVersion = info.get("redis_version");
Integer anInt = Convert.toInt(redisVersion);
if (anInt < 5) {
throw new IllegalStateException(StrUtil.format("您當(dāng)前的 Redis 版本為 {},小于最低要求的 5.0.0 版本!", redisVersion));
}
}
@Override
public void destroy() throws Exception {
}
@Override
public void afterPropertiesSet() throws Exception {
checkRedisVersion();
StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
if (Boolean.FALSE.equals(redisTemplate.hasKey(streamName))) {
streamOperations.createGroup(streamName, ReadOffset.from("0"), userEventGroup);
}
}
}
說明:該配置類實(shí)現(xiàn)了對 Redis Stream 的相關(guān)配置,包括消息監(jiān)聽容器的創(chuàng)建、流讀取請求的生成、Redis 版本的檢查以及組的創(chuàng)建等功能。
生產(chǎn)者
package com.mjg.config;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Collections;
@Component
@RequiredArgsConstructor
@Slf4j
public class MessageProducer {
private final RedisTemplate<String, Object> redisTemplate;
public void sendMessage(String streamKey, Object message) {
RecordId recordId = redisTemplate
.opsForStream().add(StreamRecords.newRecord()
.ofMap(Collections.singletonMap("data", message))
.withStreamKey(streamKey));
if (recordId!= null) {
log.info("Message sent to Stream '{}' with RecordId: {}", streamKey, recordId);
}
}
}
說明:MessageProducer 類負(fù)責(zé)向 Redis Stream 發(fā)送消息。
消費(fèi)者
package com.mjg.config;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
@RequiredArgsConstructor
@Component
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {
private final RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String stream = message.getStream();
String messageId = message.getId().toString();
String messageBody = message.getValue();
System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId);
System.out.println("Message body: " + messageBody);
// 消息應(yīng)答
redisTemplate.opsForStream().acknowledge(RedisStreamConfig.streamName, RedisStreamConfig.userEventGroup, message.getId());
}
}
說明:MessageConsumer 類實(shí)現(xiàn)了 StreamListener 接口,用于處理從 Redis Stream 接收到的消息,并進(jìn)行相應(yīng)的應(yīng)答操作。
測試
@RequiredArgsConstructor
@Slf4j
@RestController
public class MessageController {
public static String streamName = "user-event-stream";
private final MessageProducer messageProducer;
@GetMapping("/send")
public void send() {
messageProducer.sendMessage(streamName, "hello 啦啦啦啦" + LocalDateTime.now());
}
}
說明:MessageController 類中的 send 方法通過調(diào)用 MessageProducer 來發(fā)送消息到指定的 Redis Stream 中。

以上就是SpringBoot使用Redis Stream實(shí)現(xiàn)輕量消息隊(duì)列的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot Redis Stream輕量消息隊(duì)列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解JavaEE 使用 Redis 數(shù)據(jù)庫進(jìn)行內(nèi)容緩存和高訪問負(fù)載
本篇文章主要介紹了JavaEE 使用 Redis 數(shù)據(jù)庫進(jìn)行內(nèi)容緩存和高訪問負(fù)載,具有一定的參考價值,有興趣的可以了解一下2017-08-08
Spring中ApplicationListener的使用解析
這篇文章主要介紹了Spring中ApplicationListener的使用解析,ApplicationContext事件機(jī)制是觀察者設(shè)計(jì)模式的實(shí)現(xiàn),通過ApplicationEvent類和ApplicationListener接口,需要的朋友可以參考下2023-12-12
老生常談Java中instanceof關(guān)鍵字的理解
java 中的instanceof 運(yùn)算符是用來在運(yùn)行時指出對象是否是特定類的一個實(shí)例。這篇文章主要介紹了老生常談Java中instanceof關(guān)鍵字的理解,需要的朋友可以參考下2018-10-10
詳談hibernate,jpa與spring?data?jpa三者之間的關(guān)系
這篇文章主要介紹了hibernate,jpa與spring?data?jpa三者之間的關(guān)系,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11
Java實(shí)現(xiàn)發(fā)送HTML內(nèi)容并帶附件的電子郵件
這篇文章主要為大家詳細(xì)介紹了如何使用Java實(shí)現(xiàn)發(fā)送HTML內(nèi)容并帶附件的電子郵件,文中的示例代碼講解詳細(xì),有需要的小伙伴可以參考一下2025-01-01
Spring MVC過濾器-登錄過濾的代碼實(shí)現(xiàn)
本篇文章主要介紹了Spring MVC過濾器-登錄過濾,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧。2017-01-01

