一文帶你搞懂Redis Stream的6種消息處理模式
Redis 5.0版本引入的Stream數(shù)據(jù)類型,為Redis生態(tài)帶來了強(qiáng)大而靈活的消息隊(duì)列功能,彌補(bǔ)了之前發(fā)布/訂閱模式的不足,如消息持久化、消費(fèi)者組、消息確認(rèn)等特性。
Redis Stream結(jié)合了傳統(tǒng)消息隊(duì)列和時(shí)序數(shù)據(jù)庫的特點(diǎn),適用于日志收集、事件驅(qū)動(dòng)應(yīng)用、實(shí)時(shí)分析等多種場景。
本文將介紹Redis Stream的6種消息處理模式。
1. 簡單消費(fèi)模式(Simple Consumption)
基本概念
簡單消費(fèi)模式是Redis Stream最基礎(chǔ)的使用方式,不使用消費(fèi)者組,直接讀取流中的消息。生產(chǎn)者將消息追加到流中,消費(fèi)者通過指定起始ID來讀取消息。
核心命令
# 發(fā)布消息 XADD stream_name [ID] field value [field value ...] # 讀取消息 XREAD [COUNT count] [BLOCK milliseconds] STREAMS stream_name start_id
實(shí)現(xiàn)示例
Redis CLI
# 添加消息到stream
> XADD mystream * sensor_id 1234 temperature 19.8 humidity 56
"1647257548956-0"
# 從頭開始讀取所有消息
> XREAD STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1647257548956-0"
2) 1) "sensor_id"
2) "1234"
3) "temperature"
4) "19.8"
5) "humidity"
6) "56"
# 從指定ID開始讀取
> XREAD STREAMS mystream 1647257548956-0
(empty list or set)
# 從最新的消息ID之后開始讀?。ㄗ枞却孪ⅲ?
> XREAD BLOCK 5000 STREAMS mystream $
(nil)Java Spring Boot示例
@Service
public class SimpleStreamService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 發(fā)布消息到Stream
*/
public String publishEvent(String streamKey, Map<String, Object> eventData) {
StringRecord record = StreamRecords.string(eventData).withStreamKey(streamKey);
return redisTemplate.opsForStream().add(record).getValue();
}
/**
* 從指定位置開始讀取消息
*/
public List<MapRecord<String, Object, Object>> readEvents(String streamKey, String startId, int count) {
StreamReadOptions readOptions = StreamReadOptions.empty().count(count);
return redisTemplate.opsForStream().read(readOptions, StreamOffset.from(streamKey, ReadOffset.from(startId)));
}
/**
* 阻塞式讀取消息
*/
public List<MapRecord<String, Object, Object>> readEventsBlocking(String streamKey, int timeoutMillis) {
StreamReadOptions readOptions = StreamReadOptions.empty().count(10).block(Duration.ofMillis(timeoutMillis));
return redisTemplate.opsForStream().read(readOptions, StreamOffset.latest(streamKey));
}
}使用場景
- 簡單的事件日志記錄
- 單一消費(fèi)者場景
- 時(shí)間序列數(shù)據(jù)收集
- 開發(fā)和調(diào)試階段
優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 實(shí)現(xiàn)簡單,無需創(chuàng)建和管理消費(fèi)者組
- 直接控制從哪個(gè)位置開始消費(fèi)消息
- 適合單個(gè)消費(fèi)者場景
缺點(diǎn)
- 無法實(shí)現(xiàn)負(fù)載均衡
- 無法追蹤消息確認(rèn)狀態(tài)
- 需要手動(dòng)管理已讀消息ID
- 服務(wù)重啟需自行記錄上次讀取位置
2. 消費(fèi)者組模式(Consumer Groups)
基本概念
消費(fèi)者組允許多個(gè)消費(fèi)者共同處理一個(gè)流的消息,實(shí)現(xiàn)負(fù)載均衡,并提供消息確認(rèn)機(jī)制,確保消息至少被處理一次。每個(gè)消費(fèi)者組維護(hù)自己的消費(fèi)位置,不同消費(fèi)者組之間互不干擾。
核心命令
# 創(chuàng)建消費(fèi)者組 XGROUP CREATE stream_name group_name [ID|$] [MKSTREAM] # 從消費(fèi)者組讀取消息 XREADGROUP GROUP group_name consumer_name [COUNT count] [BLOCK milliseconds] STREAMS stream_name [>|ID] # 確認(rèn)消息處理完成 XACK stream_name group_name message_id [message_id ...]
實(shí)現(xiàn)示例
Redis CLI
# 創(chuàng)建消費(fèi)者組
> XGROUP CREATE mystream processing-group $ MKSTREAM
OK
# 消費(fèi)者1讀取消息
> XREADGROUP GROUP processing-group consumer-1 COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1647257548956-0"
2) 1) "sensor_id"
2) "1234"
3) "temperature"
4) "19.8"
5) "humidity"
6) "56"
# 確認(rèn)消息已處理
> XACK mystream processing-group 1647257548956-0
(integer) 1
# 消費(fèi)者2讀取消息(已無未處理消息)
> XREADGROUP GROUP processing-group consumer-2 COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) (empty list or set)Java Spring Boot示例
@Service
public class ConsumerGroupService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 創(chuàng)建消費(fèi)者組
*/
public void createGroup(String streamKey, String groupName) {
try {
redisTemplate.opsForStream().createGroup(streamKey, groupName);
} catch (RedisSystemException e) {
// 處理流不存在的情況
if (e.getRootCause() instanceof RedisCommandExecutionException
&& e.getRootCause().getMessage().contains("NOGROUP")) {
redisTemplate.opsForStream().createGroup(ReadOffset.from("0"), streamKey, groupName);
} else {
throw e;
}
}
}
/**
* 從消費(fèi)者組讀取消息
*/
public List<MapRecord<String, Object, Object>> readFromGroup(
String streamKey, String groupName, String consumerName, int count) {
StreamReadOptions options = StreamReadOptions.empty().count(count);
return redisTemplate.opsForStream().read(
Consumer.from(groupName, consumerName),
options,
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
);
}
/**
* 阻塞式從消費(fèi)者組讀取消息
*/
public List<MapRecord<String, Object, Object>> readFromGroupBlocking(
String streamKey, String groupName, String consumerName, int count, Duration timeout) {
StreamReadOptions options = StreamReadOptions.empty().count(count).block(timeout);
return redisTemplate.opsForStream().read(
Consumer.from(groupName, consumerName),
options,
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
);
}
/**
* 確認(rèn)消息已處理
*/
public Long acknowledgeMessage(String streamKey, String groupName, String... messageIds) {
return redisTemplate.opsForStream().acknowledge(streamKey, groupName, messageIds);
}
}使用場景
- 需要橫向擴(kuò)展消息處理能力的系統(tǒng)
- 要求消息可靠處理的業(yè)務(wù)場景
- 實(shí)現(xiàn)消息工作隊(duì)列
- 微服務(wù)間的事件傳遞
優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 多個(gè)消費(fèi)者可以并行處理消息
- 提供消息確認(rèn)機(jī)制,保證消息不丟失
- 支持消費(fèi)者崩潰后恢復(fù)處理
- 每個(gè)消費(fèi)者組維護(hù)獨(dú)立的消費(fèi)位置
缺點(diǎn)
- 實(shí)現(xiàn)相對(duì)復(fù)雜
- 需要妥善管理消費(fèi)者組和消費(fèi)者
- 需要顯式處理消息確認(rèn)
- 需要定期處理未確認(rèn)的消息
3. 阻塞式消費(fèi)模式(Blocking Consumption)
基本概念
阻塞式消費(fèi)允許消費(fèi)者在沒有新消息時(shí)保持連接,等待新消息到達(dá)。這種模式減少了輪詢開銷,提高了實(shí)時(shí)性,適合對(duì)消息處理時(shí)效性要求高的場景。
核心命令
# 阻塞式簡單消費(fèi) XREAD BLOCK milliseconds STREAMS stream_name ID # 阻塞式消費(fèi)者組消費(fèi) XREADGROUP GROUP group_name consumer_name BLOCK milliseconds STREAMS stream_name >
實(shí)現(xiàn)示例
Redis CLI
# 阻塞等待新消息(最多等待10秒) > XREAD BLOCK 10000 STREAMS mystream $ (nil) # 如果10秒內(nèi)沒有新消息 # 使用消費(fèi)者組的阻塞式消費(fèi) > XREADGROUP GROUP processing-group consumer-1 BLOCK 10000 STREAMS mystream > (nil) # 如果10秒內(nèi)沒有新分配的消息
Java Spring Boot示例
@Service
public class BlockingStreamConsumerService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 阻塞式消息消費(fèi)者任務(wù)
*/
@Async
public void startBlockingConsumer(String streamKey, String lastId, Duration timeout) {
StreamReadOptions options = StreamReadOptions.empty()
.count(1)
.block(timeout);
while (!Thread.currentThread().isInterrupted()) {
try {
// 阻塞讀取消息
List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream()
.read(options, StreamOffset.from(streamKey, ReadOffset.from(lastId)));
if (records != null && !records.isEmpty()) {
for (MapRecord<String, Object, Object> record : records) {
// 處理消息
processMessage(record);
// 更新最后讀取的ID
lastId = record.getId().getValue();
}
} else {
// 超時(shí)未讀取到消息,可以執(zhí)行一些其他邏輯
}
} catch (Exception e) {
// 異常處理
log.error("Error reading from stream: {}", e.getMessage(), e);
try {
Thread.sleep(1000); // 出錯(cuò)后等待一段時(shí)間再重試
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
/**
* 阻塞式消費(fèi)者組消費(fèi)
*/
@Async
public void startGroupBlockingConsumer(
String streamKey, String groupName, String consumerName, Duration timeout) {
StreamReadOptions options = StreamReadOptions.empty()
.count(1)
.block(timeout);
while (!Thread.currentThread().isInterrupted()) {
try {
// 阻塞讀取消息
List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream()
.read(Consumer.from(groupName, consumerName),
options,
StreamOffset.create(streamKey, ReadOffset.lastConsumed()));
if (records != null && !records.isEmpty()) {
for (MapRecord<String, Object, Object> record : records) {
try {
// 處理消息
processMessage(record);
// 確認(rèn)消息
redisTemplate.opsForStream()
.acknowledge(streamKey, groupName, record.getId().getValue());
} catch (Exception e) {
// 處理失敗,記錄日志
log.error("Error processing message: {}", e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("Error reading from stream group: {}", e.getMessage(), e);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
private void processMessage(MapRecord<String, Object, Object> record) {
// 實(shí)際消息處理邏輯
log.info("Processing message: {}", record);
// ...處理消息的具體業(yè)務(wù)邏輯
}
}使用場景
- 實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)
- 事件驅(qū)動(dòng)的任務(wù)處理
- 低延遲要求的應(yīng)用
- 即時(shí)通訊系統(tǒng)
- 通知服務(wù)
優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 減少輪詢帶來的資源浪費(fèi)
- 實(shí)時(shí)性好,消息到達(dá)后立即處理
- 降低Redis和客戶端的負(fù)載
- 節(jié)省CPU和網(wǎng)絡(luò)資源
缺點(diǎn)
- 長連接可能占用Redis連接資源
- 需要合理設(shè)置超時(shí)時(shí)間
- 可能需要處理網(wǎng)絡(luò)中斷后的重連
- 消費(fèi)者需要具備并發(fā)處理能力
4. 扇出模式(Fan-out Pattern)
基本概念
扇出模式允許多個(gè)獨(dú)立的消費(fèi)者組同時(shí)消費(fèi)同一個(gè)流中的所有消息,類似于發(fā)布/訂閱模式,但具有消息持久化和回溯能力。每個(gè)消費(fèi)者組獨(dú)立維護(hù)自己的消費(fèi)位置。
核心命令
創(chuàng)建多個(gè)消費(fèi)者組,它們都獨(dú)立消費(fèi)同一個(gè)流:
XGROUP CREATE stream_name group_name_1 $ MKSTREAM XGROUP CREATE stream_name group_name_2 $ MKSTREAM XGROUP CREATE stream_name group_name_3 $ MKSTREAM
實(shí)現(xiàn)示例
Redis CLI
# 創(chuàng)建多個(gè)消費(fèi)者組
> XGROUP CREATE notifications analytics-group $ MKSTREAM
OK
> XGROUP CREATE notifications email-group $ MKSTREAM
OK
> XGROUP CREATE notifications mobile-group $ MKSTREAM
OK
# 添加一條消息
> XADD notifications * type user_signup user_id 1001 email "user@example.com"
"1647345678912-0"
# 從各個(gè)消費(fèi)者組讀?。總€(gè)組都能收到所有消息)
> XREADGROUP GROUP analytics-group analytics-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
2) 1) 1) "1647345678912-0"
2) 1) "type"
2) "user_signup"
3) "user_id"
4) "1001"
5) "email"
6) "user@example.com"
> XREADGROUP GROUP email-group email-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
2) 1) 1) "1647345678912-0"
2) 1) "type"
2) "user_signup"
3) "user_id"
4) "1001"
5) "email"
6) "user@example.com"
> XREADGROUP GROUP mobile-group mobile-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
2) 1) 1) "1647345678912-0"
2) 1) "type"
2) "user_signup"
3) "user_id"
4) "1001"
5) "email"
6) "user@example.com"Java Spring Boot示例
@Service
public class FanOutService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 初始化扇出消費(fèi)者組
*/
public void initializeFanOutGroups(String streamKey, List<String> groupNames) {
// 確保流存在
try {
StreamInfo.XInfoStream info = redisTemplate.opsForStream().info(streamKey);
} catch (Exception e) {
// 流不存在,發(fā)送一個(gè)初始消息
Map<String, Object> initialMessage = new HashMap<>();
initialMessage.put("init", "true");
redisTemplate.opsForStream().add(streamKey, initialMessage);
}
// 創(chuàng)建所有消費(fèi)者組
for (String groupName : groupNames) {
try {
redisTemplate.opsForStream().createGroup(streamKey, groupName);
} catch (Exception e) {
// 忽略組已存在的錯(cuò)誤
log.info("Group {} may already exist: {}", groupName, e.getMessage());
}
}
}
/**
* 發(fā)布扇出消息
*/
public String publishFanOutMessage(String streamKey, Map<String, Object> messageData) {
StringRecord record = StreamRecords.string(messageData).withStreamKey(streamKey);
return redisTemplate.opsForStream().add(record).getValue();
}
/**
* 為特定組啟動(dòng)消費(fèi)者
*/
@Async
public void startGroupConsumer(
String streamKey, String groupName, String consumerName,
Consumer<MapRecord<String, Object, Object>> messageHandler) {
StreamReadOptions options = StreamReadOptions.empty().count(10).block(Duration.ofSeconds(2));
while (!Thread.currentThread().isInterrupted()) {
try {
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
Consumer.from(groupName, consumerName),
options,
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
);
if (messages != null && !messages.isEmpty()) {
for (MapRecord<String, Object, Object> message : messages) {
try {
// 處理消息
messageHandler.accept(message);
// 確認(rèn)消息
redisTemplate.opsForStream().acknowledge(
streamKey, groupName, message.getId().getValue());
} catch (Exception e) {
log.error("Error processing message in group {}: {}",
groupName, e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("Error reading from stream for group {}: {}",
groupName, e.getMessage(), e);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}使用示例
@Service
public class NotificationService {
@Autowired
private FanOutService fanOutService;
@PostConstruct
public void init() {
// 初始化扇出組
List<String> groups = Arrays.asList("email-group", "sms-group", "analytics-group");
fanOutService.initializeFanOutGroups("user-events", groups);
// 啟動(dòng)各個(gè)消費(fèi)者組的處理器
fanOutService.startGroupConsumer(
"user-events", "email-group", "email-consumer", this::processEmailNotification);
fanOutService.startGroupConsumer(
"user-events", "sms-group", "sms-consumer", this::processSmsNotification);
fanOutService.startGroupConsumer(
"user-events", "analytics-group", "analytics-consumer", this::processAnalyticsEvent);
}
private void processEmailNotification(MapRecord<String, Object, Object> message) {
Map<Object, Object> messageData = message.getValue();
log.info("Processing email notification: {}", messageData);
// 郵件發(fā)送邏輯
}
private void processSmsNotification(MapRecord<String, Object, Object> message) {
Map<Object, Object> messageData = message.getValue();
log.info("Processing SMS notification: {}", messageData);
// 短信發(fā)送邏輯
}
private void processAnalyticsEvent(MapRecord<String, Object, Object> message) {
Map<Object, Object> messageData = message.getValue();
log.info("Processing analytics event: {}", messageData);
// 分析事件處理邏輯
}
public void publishUserEvent(String eventType, Map<String, Object> eventData) {
Map<String, Object> message = new HashMap<>(eventData);
message.put("event_type", eventType);
message.put("timestamp", System.currentTimeMillis());
fanOutService.publishFanOutMessage("user-events", message);
}
}使用場景
- 多個(gè)系統(tǒng)需要獨(dú)立處理同一事件流
- 實(shí)現(xiàn)事件廣播機(jī)制
- 系統(tǒng)集成:一個(gè)事件觸發(fā)多個(gè)業(yè)務(wù)流程
- 日志統(tǒng)一處理并分發(fā)到不同服務(wù)
- 通知系統(tǒng):一個(gè)事件需要通過多種方式通知用戶
優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 實(shí)現(xiàn)一次發(fā)布多次消費(fèi)
- 各消費(fèi)者組獨(dú)立工作,互不影響
- 新增消費(fèi)者組可以從頭開始消費(fèi)所有歷史消息
- 可靠性高,消息持久化存儲(chǔ)
缺點(diǎn)
- 隨著流數(shù)據(jù)增長,可能占用較多存儲(chǔ)空間
- 需要合理設(shè)置流的最大長度或過期策略
- 消費(fèi)者組數(shù)量過多可能增加Redis負(fù)載
- 需要單獨(dú)管理每個(gè)消費(fèi)者組的狀態(tài)
5. 重試與恢復(fù)模式(Retry and Recovery)
基本概念
這種模式關(guān)注處理失敗消息的恢復(fù)和重試機(jī)制。Redis Stream消費(fèi)者組會(huì)跟蹤每個(gè)消息的處理狀態(tài),允許查看和管理未確認(rèn)(PEL - Pending Entry List)的消息,實(shí)現(xiàn)可靠的消息處理。
核心命令
# 查看消費(fèi)者組中未確認(rèn)的消息 XPENDING stream_name group_name [start_id end_id count] [consumer_name] # 查看消費(fèi)者組中長時(shí)間未確認(rèn)的消息詳情 XPENDING stream_name group_name start_id end_id count [consumer_name] # 認(rèn)領(lǐng)處理超時(shí)的消息 XCLAIM stream_name group_name consumer_name min_idle_time message_id [message_id ...] [JUSTID]
實(shí)現(xiàn)示例
Redis CLI
# 查看未確認(rèn)的消息數(shù)量
> XPENDING mystream processing-group
1) (integer) 2 # 未確認(rèn)消息數(shù)量
2) "1647257548956-0" # 最小ID
3) "1647257549123-0" # 最大ID
4) 1) 1) "consumer-1" # 各個(gè)消費(fèi)者的未確認(rèn)消息數(shù)
2) (integer) 1
2) 1) "consumer-2"
2) (integer) 1
# 查看特定消費(fèi)者的未確認(rèn)消息
> XPENDING mystream processing-group - + 10 consumer-1
1) 1) "1647257548956-0" # 消息ID
2) "consumer-1" # 當(dāng)前持有的消費(fèi)者
3) (integer) 120000 # 空閑時(shí)間(毫秒)
4) (integer) 2 # 傳遞次數(shù)
# 認(rèn)領(lǐng)超過2分鐘未處理的消息
> XCLAIM mystream processing-group consumer-2 120000 1647257548956-0
1) 1) "1647257548956-0"
2) 1) "sensor_id"
2) "1234"
3) "temperature"
4) "19.8"
5) "humidity"
6) "56"Java Spring Boot示例
@Service
public class MessageRecoveryService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 獲取消費(fèi)者組中的未確認(rèn)消息
*/
public PendingMessagesSummary getPendingMessagesSummary(String streamKey, String groupName) {
return redisTemplate.opsForStream().pending(streamKey, groupName);
}
/**
* 獲取指定消費(fèi)者的詳細(xì)未確認(rèn)消息
*/
public PendingMessages getPendingMessages(
String streamKey, String groupName, String consumerName,
Range<String> idRange, long count) {
return redisTemplate.opsForStream().pending(
streamKey,
Consumer.from(groupName, consumerName),
idRange,
count);
}
/**
* 認(rèn)領(lǐng)長時(shí)間未處理的消息
*/
public List<MapRecord<String, Object, Object>> claimMessages(
String streamKey, String groupName, String newConsumerName,
Duration minIdleTime, String... messageIds) {
return redisTemplate.opsForStream().claim(
streamKey,
Consumer.from(groupName, newConsumerName),
minIdleTime,
messageIds);
}
/**
* 定時(shí)檢查和恢復(fù)未處理的消息
*/
@Scheduled(fixedRate = 60000) // 每分鐘執(zhí)行一次
public void recoverStaleMessages() {
// 配置參數(shù)
String streamKey = "mystream";
String groupName = "processing-group";
String recoveryConsumer = "recovery-consumer";
Duration minIdleTime = Duration.ofMinutes(5); // 超過5分鐘未處理的消息
try {
// 1. 獲取所有未確認(rèn)消息的摘要
PendingMessagesSummary summary = getPendingMessagesSummary(streamKey, groupName);
if (summary != null && summary.getTotalPendingMessages() > 0) {
// 2. 遍歷每個(gè)消費(fèi)者的未確認(rèn)消息
for (Consumer consumer : summary.getPendingMessagesPerConsumer().keySet()) {
// 獲取該消費(fèi)者的詳細(xì)未確認(rèn)消息列表
PendingMessages pendingMessages = getPendingMessages(
streamKey, groupName, consumer.getName(),
Range.unbounded(), 50); // 每次最多處理50條
if (pendingMessages != null) {
// 3. 篩選出空閑時(shí)間超過閾值的消息
List<String> staleMessageIds = new ArrayList<>();
for (PendingMessage message : pendingMessages) {
if (message.getElapsedTimeSinceLastDelivery().compareTo(minIdleTime) > 0) {
staleMessageIds.add(message.getIdAsString());
}
}
// 4. 認(rèn)領(lǐng)這些消息
if (!staleMessageIds.isEmpty()) {
log.info("Claiming {} stale messages from consumer {}",
staleMessageIds.size(), consumer.getName());
List<MapRecord<String, Object, Object>> claimedMessages = claimMessages(
streamKey, groupName, recoveryConsumer, minIdleTime,
staleMessageIds.toArray(new String[0]));
// 5. 處理這些被認(rèn)領(lǐng)的消息
processClaimedMessages(streamKey, groupName, claimedMessages);
}
}
}
}
} catch (Exception e) {
log.error("Error recovering stale messages: {}", e.getMessage(), e);
}
}
/**
* 處理被認(rèn)領(lǐng)的消息
*/
private void processClaimedMessages(
String streamKey, String groupName,
List<MapRecord<String, Object, Object>> messages) {
if (messages == null || messages.isEmpty()) {
return;
}
for (MapRecord<String, Object, Object> message : messages) {
try {
// 執(zhí)行消息處理邏輯
processMessage(message);
// 確認(rèn)消息
redisTemplate.opsForStream().acknowledge(
streamKey, groupName, message.getId().getValue());
log.info("Successfully processed recovered message: {}", message.getId());
} catch (Exception e) {
log.error("Failed to process recovered message {}: {}",
message.getId(), e.getMessage(), e);
// 根據(jù)業(yè)務(wù)需求決定是否將消息加入死信隊(duì)列
moveToDeadLetterQueue(streamKey, message);
}
}
}
/**
* 將消息移至死信隊(duì)列
*/
private void moveToDeadLetterQueue(String sourceStream, MapRecord<String, Object, Object> message) {
String deadLetterStream = sourceStream + ":dead-letter";
Map<Object, Object> messageData = message.getValue();
Map<String, Object> dlqMessage = new HashMap<>();
messageData.forEach((k, v) -> dlqMessage.put(k.toString(), v));
// 添加元數(shù)據(jù)
dlqMessage.put("original_id", message.getId().getValue());
dlqMessage.put("error_time", System.currentTimeMillis());
redisTemplate.opsForStream().add(deadLetterStream, dlqMessage);
// 可選:從原消費(fèi)者組確認(rèn)該消息
// redisTemplate.opsForStream().acknowledge(sourceStream, groupName, message.getId().getValue());
}
private void processMessage(MapRecord<String, Object, Object> message) {
// 實(shí)際的消息處理邏輯
log.info("Processing recovered message: {}", message);
// ...
}
}使用場景
- 需要可靠消息處理的關(guān)鍵業(yè)務(wù)系統(tǒng)
- 處理時(shí)間較長的任務(wù)
- 需要錯(cuò)誤重試機(jī)制的工作流
- 監(jiān)控和診斷消息處理過程
- 實(shí)現(xiàn)死信隊(duì)列處理特定失敗場景
優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 提高系統(tǒng)容錯(cuò)性和可靠性
- 自動(dòng)恢復(fù)因消費(fèi)者崩潰導(dǎo)致的未處理消息
- 可以識(shí)別和處理長時(shí)間未確認(rèn)的消息
- 支持實(shí)現(xiàn)復(fù)雜的重試策略和死信處理
缺點(diǎn)
- 需要額外開發(fā)和維護(hù)恢復(fù)機(jī)制
- 可能導(dǎo)致消息重復(fù)處理,需要確保業(yè)務(wù)邏輯冪等
- 系統(tǒng)復(fù)雜度增加
- 需要監(jiān)控和管理PEL(未確認(rèn)消息列表)的大小
6. 流處理窗口模式(Streaming Window Processing)
基本概念
流處理窗口模式基于時(shí)間或消息計(jì)數(shù)劃分?jǐn)?shù)據(jù)流,在每個(gè)窗口內(nèi)執(zhí)行聚合或分析操作。這種模式適用于實(shí)時(shí)分析、趨勢監(jiān)測和時(shí)間序列處理。雖然Redis Stream本身不直接提供窗口操作,但可以結(jié)合Redis的其他特性實(shí)現(xiàn)。
實(shí)現(xiàn)方式
主要通過以下幾種方式實(shí)現(xiàn):
1. 基于消息ID的時(shí)間范圍(Redis消息ID包含毫秒時(shí)間戳)
2. 結(jié)合Redis的排序集合(SortedSet)存儲(chǔ)窗口數(shù)據(jù)
3. 使用Redis的過期鍵實(shí)現(xiàn)滑動(dòng)窗口
實(shí)現(xiàn)示例
Redis CLI
窗口數(shù)據(jù)收集與查詢:
# 添加帶時(shí)間戳的數(shù)據(jù)
> XADD temperature * sensor_id 1 value 21.5 timestamp 1647257548000
"1647257550123-0"
> XADD temperature * sensor_id 1 value 21.8 timestamp 1647257558000
"1647257560234-0"
> XADD temperature * sensor_id 1 value 22.1 timestamp 1647257568000
"1647257570345-0"
# 查詢特定時(shí)間范圍的數(shù)據(jù)
> XRANGE temperature 1647257550000-0 1647257570000-0
1) 1) "1647257550123-0"
2) 1) "sensor_id"
2) "1"
3) "value"
4) "21.5"
5) "timestamp"
6) "1647257548000"
2) 1) "1647257560234-0"
2) 1) "sensor_id"
2) "1"
3) "value"
4) "21.8"
5) "timestamp"
6) "1647257558000"Java Spring Boot示例
@Service
public class TimeWindowProcessingService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 添加數(shù)據(jù)點(diǎn)到流,并存儲(chǔ)到相應(yīng)的時(shí)間窗口
*/
public String addDataPoint(String streamKey, String sensorId, double value) {
long timestamp = System.currentTimeMillis();
// 1. 添加到原始數(shù)據(jù)流
Map<String, Object> dataPoint = new HashMap<>();
dataPoint.put("sensor_id", sensorId);
dataPoint.put("value", String.valueOf(value));
dataPoint.put("timestamp", String.valueOf(timestamp));
StringRecord record = StreamRecords.string(dataPoint).withStreamKey(streamKey);
RecordId recordId = redisTemplate.opsForStream().add(record);
// 2. 計(jì)算所屬的窗口(這里以5分鐘為一個(gè)窗口)
long windowStart = timestamp - (timestamp % (5 * 60 * 1000));
String windowKey = streamKey + ":window:" + windowStart;
// 3. 將數(shù)據(jù)點(diǎn)添加到窗口的有序集合中,分?jǐn)?shù)為時(shí)間戳
String dataPointJson = new ObjectMapper().writeValueAsString(dataPoint);
redisTemplate.opsForZSet().add(windowKey, dataPointJson, timestamp);
// 4. 設(shè)置窗口鍵的過期時(shí)間(保留24小時(shí))
redisTemplate.expire(windowKey, Duration.ofHours(24));
return recordId.getValue();
}
/**
* 獲取指定時(shí)間窗口內(nèi)的數(shù)據(jù)點(diǎn)
*/
public List<Map<String, Object>> getWindowData(
String streamKey, long windowStartTime, long windowEndTime) {
// 計(jì)算可能的窗口鍵(每5分鐘一個(gè)窗口)
List<String> windowKeys = new ArrayList<>();
long current = windowStartTime - (windowStartTime % (5 * 60 * 1000));
while (current <= windowEndTime) {
windowKeys.add(streamKey + ":window:" + current);
current += (5 * 60 * 1000);
}
// 從各個(gè)窗口獲取數(shù)據(jù)點(diǎn)
List<Map<String, Object>> results = new ArrayList<>();
ObjectMapper mapper = new ObjectMapper();
for (String windowKey : windowKeys) {
Set<String> dataPoints = redisTemplate.opsForZSet().rangeByScore(
windowKey, windowStartTime, windowEndTime);
if (dataPoints != null) {
for (String dataPointJson : dataPoints) {
try {
Map<String, Object> dataPoint = mapper.readValue(
dataPointJson, new TypeReference<Map<String, Object>>() {});
results.add(dataPoint);
} catch (Exception e) {
log.error("Error parsing data point: {}", e.getMessage(), e);
}
}
}
}
// 按時(shí)間戳排序
results.sort(Comparator.comparing(dp -> Long.parseLong(dp.get("timestamp").toString())));
return results;
}
/**
* 計(jì)算窗口內(nèi)數(shù)據(jù)的聚合統(tǒng)計(jì)
*/
public Map<String, Object> getWindowStats(
String streamKey, String sensorId, long windowStartTime, long windowEndTime) {
List<Map<String, Object>> windowData = getWindowData(streamKey, windowStartTime, windowEndTime);
// 過濾特定傳感器的數(shù)據(jù)
List<Double> values = windowData.stream()
.filter(dp -> sensorId.equals(dp.get("sensor_id").toString()))
.map(dp -> Double.parseDouble(dp.get("value").toString()))
.collect(Collectors.toList());
Map<String, Object> stats = new HashMap<>();
stats.put("count", values.size());
if (!values.isEmpty()) {
DoubleSummaryStatistics summaryStats = values.stream().collect(Collectors.summarizingDouble(v -> v));
stats.put("min", summaryStats.getMin());
stats.put("max", summaryStats.getMax());
stats.put("avg", summaryStats.getAverage());
stats.put("sum", summaryStats.getSum());
}
stats.put("start_time", windowStartTime);
stats.put("end_time", windowEndTime);
stats.put("sensor_id", sensorId);
return stats;
}
/**
* 實(shí)現(xiàn)滑動(dòng)窗口處理
*/
@Scheduled(fixedRate = 60000) // 每分鐘執(zhí)行一次
public void processSlidingWindows() {
String streamKey = "temperature";
long now = System.currentTimeMillis();
// 處理過去10分鐘窗口的數(shù)據(jù)
long windowEndTime = now;
long windowStartTime = now - (10 * 60 * 1000);
List<String> sensorIds = Arrays.asList("1", "2", "3"); // 示例傳感器ID
for (String sensorId : sensorIds) {
try {
// 獲取窗口統(tǒng)計(jì)
Map<String, Object> stats = getWindowStats(streamKey, sensorId, windowStartTime, windowEndTime);
// 根據(jù)統(tǒng)計(jì)結(jié)果執(zhí)行業(yè)務(wù)邏輯
if (stats.containsKey("avg")) {
double avgTemp = (double) stats.get("avg");
if (avgTemp > 25.0) {
// 觸發(fā)高溫警報(bào)
log.warn("High temperature alert for sensor {}: {} °C", sensorId, avgTemp);
triggerAlert(sensorId, "HIGH_TEMP", avgTemp);
}
}
// 存儲(chǔ)聚合結(jié)果用于歷史趨勢分析
saveAggregatedResults(streamKey, sensorId, stats);
} catch (Exception e) {
log.error("Error processing sliding window for sensor {}: {}",
sensorId, e.getMessage(), e);
}
}
}
/**
* 觸發(fā)警報(bào)
*/
private void triggerAlert(String sensorId, String alertType, double value) {
Map<String, Object> alertData = new HashMap<>();
alertData.put("sensor_id", sensorId);
alertData.put("alert_type", alertType);
alertData.put("value", value);
alertData.put("timestamp", System.currentTimeMillis());
redisTemplate.opsForStream().add("alerts", alertData);
}
/**
* 保存聚合結(jié)果
*/
private void saveAggregatedResults(String streamKey, String sensorId, Map<String, Object> stats) {
long windowTime = (long) stats.get("end_time");
String aggregateKey = streamKey + ":aggregate:" + sensorId;
// 使用時(shí)間作為分?jǐn)?shù)存儲(chǔ)聚合結(jié)果
redisTemplate.opsForZSet().add(
aggregateKey,
new ObjectMapper().writeValueAsString(stats),
windowTime);
// 保留30天的聚合數(shù)據(jù)
redisTemplate.expire(aggregateKey, Duration.ofDays(30));
}
}使用場景
- 實(shí)時(shí)數(shù)據(jù)分析與統(tǒng)計(jì)
- 趨勢檢測和預(yù)測
- 異常值和閾值監(jiān)控
- 時(shí)間序列數(shù)據(jù)處理
- IoT數(shù)據(jù)流處理和聚合
- 用戶行為分析
優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 支持基于時(shí)間的數(shù)據(jù)分析
- 可以實(shí)現(xiàn)實(shí)時(shí)聚合和計(jì)算
- 靈活的窗口定義(滑動(dòng)窗口、滾動(dòng)窗口)
- 可擴(kuò)展以支持復(fù)雜的分析場景
缺點(diǎn)
- 實(shí)現(xiàn)復(fù)雜度較高
- 可能需要額外的數(shù)據(jù)結(jié)構(gòu)和存儲(chǔ)空間
- 對(duì)于大數(shù)據(jù)量的窗口計(jì)算可能影響性能
- 需要小心管理內(nèi)存使用和數(shù)據(jù)過期策略
結(jié)論
Redis Stream提供了強(qiáng)大而靈活的消息處理功能,通過組合這些模式,可以構(gòu)建出高性能、可靠且靈活的消息處理系統(tǒng),滿足從簡單的任務(wù)隊(duì)列到復(fù)雜的實(shí)時(shí)數(shù)據(jù)處理等各種應(yīng)用需求。
在選擇和實(shí)現(xiàn)這些模式時(shí),應(yīng)充分考慮業(yè)務(wù)特性、性能需求、可靠性要求以及系統(tǒng)規(guī)模,結(jié)合Redis Stream的特性,打造最適合自己應(yīng)用場景的消息處理解決方案。
以上就是一文帶你搞懂Redis Stream的6種消息處理模式的詳細(xì)內(nèi)容,更多關(guān)于Redis Stream消息處理模式的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Redis?鍵值對(duì)(key-value)數(shù)據(jù)庫實(shí)現(xiàn)方法
Redis 的鍵值對(duì)中的 key 就是字符串對(duì)象,而 value 可以是字符串對(duì)象,也可以是集合數(shù)據(jù)類型的對(duì)象,比如 List 對(duì)象,Hash 對(duì)象、Set 對(duì)象和 Zset 對(duì)象,這篇文章主要介紹了Redis?鍵值對(duì)數(shù)據(jù)庫是怎么實(shí)現(xiàn)的,需要的朋友可以參考下2024-05-05
Windows系統(tǒng)安裝Redis的詳細(xì)圖文教程
但有時(shí)候想在windows下折騰下Redis,那么就可以參考下面的方法了,雖然腳本之家小編以前整理了一些,發(fā)現(xiàn)這篇做的比較詳細(xì),下載也給出來了2018-08-08
ubuntu 16.04安裝redis的兩種方式教程詳解(apt和編譯方式)
這篇文章主要介紹了ubuntu 16.04安裝redis的兩種方式教程詳解(apt和編譯方式),需要的朋友可以參考下2018-03-03
RedisTemplate中boundHashOps的使用小結(jié)
redisTemplate.boundHashOps(key)?是 RedisTemplate 類的一個(gè)方法,本文主要介紹了RedisTemplate中boundHashOps的使用小結(jié),具有一定的參考價(jià)值,感興趣的可以了解一下2024-04-04
使用RedisAtomicInteger計(jì)數(shù)出現(xiàn)少計(jì)問題及解決
這篇文章主要介紹了使用RedisAtomicInteger計(jì)數(shù)出現(xiàn)少計(jì)問題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-11-11
Redis 如何批量設(shè)置過期時(shí)間(PIPLINE的使用)
有時(shí)候我們并不希望redis的key一直存在。例如緩存,驗(yàn)證碼等數(shù)據(jù),我們希望它們能在一定時(shí)間內(nèi)自動(dòng)的被銷毀。本文就詳細(xì)的介紹一下Redis 如何批量設(shè)置過期時(shí)間,感興趣的可以了解一下2021-11-11
Redis在項(xiàng)目中的使用(JedisPool方式)
項(xiàng)目操作redis是使用的RedisTemplate方式,另外還可以完全使用JedisPool和Jedis來操作redis,本文給大家介紹Redis在項(xiàng)目中的使用,JedisPool方式,感興趣的朋友跟隨小編一起看看吧2021-12-12
Redis所實(shí)現(xiàn)的Reactor模型設(shè)計(jì)方案
這篇文章主要介紹了Redis所實(shí)現(xiàn)的Reactor模型,本文將帶領(lǐng)讀者從源碼的角度來查看redis關(guān)于reactor模型的設(shè)計(jì),需要的朋友可以參考下2024-06-06
詳解Redis中地理位置功能Geospatial的應(yīng)用
Geospatial?Indexes?是?Redis?提供的一種數(shù)據(jù)結(jié)構(gòu),用于存儲(chǔ)和查詢地理位置信息,這篇文章就來和大家詳細(xì)講講Geospatial的具體應(yīng)用吧2023-06-06

