Java 消息的可靠性投遞實(shí)踐建議
1.核心概念
可靠性投遞(Reliable Delivery)是指確保消息從生產(chǎn)者成功到達(dá)消費(fèi)者,即使面對(duì)網(wǎng)絡(luò)故障、系統(tǒng)崩潰等異常情況也能保證不丟失、不重復(fù)、按順序(部分場(chǎng)景)傳遞。
2.面臨的挑戰(zhàn)
- 網(wǎng)絡(luò)不可靠:丟包、延遲、分區(qū)
- 節(jié)點(diǎn)故障:生產(chǎn)者/消費(fèi)者/中間件宕機(jī)
- 重復(fù)消費(fèi):確認(rèn)機(jī)制可能引發(fā)重復(fù)
- 順序保證:分布式環(huán)境下消息亂序
3.關(guān)鍵實(shí)現(xiàn)機(jī)制
3.1生產(chǎn)端保證
// 偽代碼示例:生產(chǎn)端確認(rèn)模式
public void sendWithConfirm(Message msg) {
// 1. 持久化到本地?cái)?shù)據(jù)庫(kù)(防丟失)
messageDao.save(msg);
// 2. 發(fā)送到消息隊(duì)列
String msgId = rabbitTemplate.convertAndSend(msg);
// 3. 等待Broker確認(rèn)
boolean ack = waitForAck(msgId, TIMEOUT);
// 4. 失敗重試(指數(shù)退避)
if (!ack) {
retryWithBackoff(msg);
}
// 5. 最終記錄投遞狀態(tài)
updateDeliveryStatus(msgId, ack);
}技術(shù)要點(diǎn):
- 事務(wù)機(jī)制:同步方式,性能差(不推薦)
- 確認(rèn)機(jī)制(Confirm):
- 普通確認(rèn)(每消息確認(rèn))
- 批量確認(rèn)(提高吞吐)
- 異步監(jiān)聽(tīng)(最佳實(shí)踐)
- 本地消息表:事務(wù)消息的替代方案
- 消息持久化:設(shè)置
delivery_mode=2
3.2Broker端保證
消息處理流程: Producer → Broker接收 → 持久化存儲(chǔ) → 推送給Consumer → 等待ACK → 刪除/重投
持久化策略:
- 隊(duì)列持久化:
durable=true - 消息持久化:
delivery_mode=2 - 鏡像隊(duì)列:多副本冗余(RabbitMQ)
- 高可用集群:主從切換時(shí)不丟消息
3.3消費(fèi)端保證
// 消費(fèi)端保證示例
@RabbitListener(queues = "order.queue")
public void handleOrder(OrderMessage order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
// 1. 業(yè)務(wù)處理
orderService.process(order);
// 2. 手動(dòng)確認(rèn)(成功才ACK)
channel.basicAck(tag, false);
// 3. 更新消費(fèi)記錄
consumeRecordService.markConsumed(order.getId());
} catch (Exception e) {
// 4. 失敗處理:重試或進(jìn)入死信隊(duì)列
if (retryCount < MAX_RETRY) {
channel.basicNack(tag, false, true); // 重入隊(duì)列
} else {
channel.basicNack(tag, false, false); // 進(jìn)入死信隊(duì)列
alarmService.notifyAdmin(order, e);
}
}
}消費(fèi)端關(guān)鍵點(diǎn):
- 手動(dòng)ACK:避免自動(dòng)確認(rèn)導(dǎo)致消息丟失
- 冪等性設(shè)計(jì):
public boolean processWithIdempotent(String msgId) {
// 基于消息ID去重
if (redis.exists("processed:" + msgId)) {
return true; // 已處理過(guò)
}
// 業(yè)務(wù)處理
boolean success = doBusinessLogic();
// 記錄處理狀態(tài)
if (success) {
redis.setex("processed:" + msgId, 24h, "1");
}
return success;
}- 死信隊(duì)列(DLQ):處理無(wú)法消費(fèi)的消息
- 消費(fèi)重試策略:
- 立即重試(瞬時(shí)故障)
- 延遲重試(業(yè)務(wù)依賴)
- 指數(shù)退避(防止雪崩)
4.完整可靠性方案
4.1事務(wù)消息方案(如RocketMQ)
兩階段提交: 1. 發(fā)送Half Message(預(yù)備消息) 2. 執(zhí)行本地事務(wù) 3. 根據(jù)本地事務(wù)結(jié)果Commit/Rollback 4. Broker檢查事務(wù)狀態(tài)并投遞/丟棄
4.2最大努力投遞方案
# 補(bǔ)償機(jī)制實(shí)現(xiàn)
def reliable_delivery(message):
max_retries = 5
for attempt in range(max_retries):
try:
# 嘗試投遞
result = mq_client.send(message)
if result.confirmed:
log_delivery_success(message.id)
return True
except Exception as e:
log_failure(attempt, e)
if attempt == max_retries - 1:
# 最終失敗,人工介入
send_alert_to_admin(message)
save_to_compensation_table(message)
return False
# 等待后重試
sleep(backoff_time(attempt))
return False4.3本地消息表方案(經(jīng)典)
-- 本地消息表結(jié)構(gòu)
CREATE TABLE local_message (
id BIGINT PRIMARY KEY,
biz_id VARCHAR(64), -- 業(yè)務(wù)ID
content TEXT, -- 消息內(nèi)容
status TINYINT, -- 0:待發(fā)送, 1:已發(fā)送, 2:已確認(rèn)
retry_count INT,
next_retry_time DATETIME,
created_at TIMESTAMP
);工作流程:
- 業(yè)務(wù)數(shù)據(jù)+消息記錄原子性寫入本地DB
- 定時(shí)任務(wù)掃描待發(fā)送消息
- 調(diào)用MQ發(fā)送,成功后更新?tīng)顟B(tài)
- 消費(fèi)者處理完成后反向確認(rèn)
- 對(duì)賬程序定期校驗(yàn)數(shù)據(jù)一致性
5.高級(jí)特性與優(yōu)化
5.1順序性保證
- 全局有序:?jiǎn)侮?duì)列單消費(fèi)者(性能低)
- 局部有序:相同sharding key的消息發(fā)到同一隊(duì)列
- 犧牲場(chǎng)景:重試隊(duì)列可能破壞順序
5.2批量消息可靠性
// 批量消息的可靠性處理
public class BatchMessageReliableSender {
public void sendBatch(List<Message> batch) {
// 1. 批量持久化到本地
batchMessageDao.saveAll(batch);
// 2. 設(shè)置批次ID
String batchId = generateBatchId();
// 3. 發(fā)送批次消息
boolean success = mqTemplate.sendBatch(batchId, batch);
// 4. 批次確認(rèn)(或單條補(bǔ)償)
if (success) {
markBatchDelivered(batchId);
} else {
// 逐條重試或記錄異常
compensateFailedMessages(batch);
}
}
}5.3監(jiān)控與對(duì)賬
- 實(shí)時(shí)監(jiān)控:
- 堆積情況監(jiān)控
- 消費(fèi)延遲報(bào)警
- 失敗率統(tǒng)計(jì)
- 定期對(duì)賬:
-- 消息對(duì)賬SQL示例 SELECT DATE(create_time) as day, COUNT(*) as total_sent, SUM(CASE WHEN status=2 THEN 1 ELSE 0 END) as confirmed, SUM(CASE WHEN status=1 THEN 1 ELSE 0 END) as pending FROM message_record GROUP BY DATE(create_time) HAVING total_sent != confirmed;
6.不同MQ的實(shí)現(xiàn)差異
| 特性 | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|
| 可靠性機(jī)制 | 確認(rèn)+持久化+鏡像隊(duì)列 | 副本機(jī)制+ACK+Exactly-Once | 事務(wù)消息+本地存儲(chǔ) |
| 順序性 | 單隊(duì)列保證 | Partition內(nèi)有序 | Queue內(nèi)有序 |
| 事務(wù)支持 | 輕量級(jí)事務(wù)(性能差) | 支持Exactly-Once語(yǔ)義 | 完整事務(wù)消息 |
| 最佳適用場(chǎng)景 | 業(yè)務(wù)消息、高可靠要求 | 日志流、大數(shù)據(jù)場(chǎng)景 | 金融交易、訂單業(yè)務(wù) |
7.實(shí)踐建議
- 分級(jí)可靠性策略:
- 關(guān)鍵業(yè)務(wù):事務(wù)消息+本地表+對(duì)賬
- 普通業(yè)務(wù):確認(rèn)機(jī)制+重試+死信隊(duì)列
- 日志類:最多一次投遞即可
- 性能與可靠性的平衡:
- 同步刷盤 vs 異步刷盤
- 同步復(fù)制 vs 異步復(fù)制
- 根據(jù)業(yè)務(wù)重要性選擇配置
- 災(zāi)難恢復(fù)設(shè)計(jì):
# 配置示例:多級(jí)降級(jí)
mq:
primary:
url: "amqp://primary"
timeout: 1000ms
secondary:
url: "amqp://secondary"
timeout: 2000ms
fallback-to-db: true # 最終降級(jí)到數(shù)據(jù)庫(kù)總結(jié)
消息的可靠性投遞是一個(gè)系統(tǒng)工程,需要在生產(chǎn)端、Broker端、消費(fèi)端協(xié)同設(shè)計(jì),結(jié)合業(yè)務(wù)場(chǎng)景、性能要求、成本約束做出合適的選擇。沒(méi)有"銀彈"方案,只有最適合的方案。建議從簡(jiǎn)單方案開(kāi)始,隨著業(yè)務(wù)復(fù)雜度增加逐步引入更完善的可靠性機(jī)制。
面試回答
首先,消息可靠性投遞指的是:
一個(gè)消息從發(fā)送到被消費(fèi)者成功處理,過(guò)程中不會(huì)丟失或重復(fù),保證最終數(shù)據(jù)的一致性。在實(shí)際系統(tǒng)里,消息可能因?yàn)榫W(wǎng)絡(luò)問(wèn)題、服務(wù)重啟等原因丟失或重復(fù),所以我們需要一套機(jī)制來(lái)確??煽?。
為什么需要它呢?
比如在訂單系統(tǒng)中,用戶支付成功后要通知物流系統(tǒng),如果消息丟了,物流就不會(huì)觸發(fā),用戶體驗(yàn)就受損;如果消息重復(fù),可能重復(fù)發(fā)貨,造成損失。所以像金融、交易這些場(chǎng)景,可靠性特別重要。
常見(jiàn)的實(shí)現(xiàn)方式,我了解的有幾種:
- 生產(chǎn)者確認(rèn)機(jī)制
生產(chǎn)者發(fā)消息后,MQ(比如RabbitMQ)會(huì)返回一個(gè)確認(rèn)(ACK),如果沒(méi)收到ACK,生產(chǎn)者可以重發(fā)。這樣可以防止消息在發(fā)送階段丟失。 - 消息持久化
消息保存到磁盤,而不是只放在內(nèi)存。這樣即使MQ重啟,消息也不會(huì)丟。 - 消費(fèi)者手動(dòng)ACK
消費(fèi)者處理完消息后,手動(dòng)告訴MQ“我已經(jīng)處理完了”,MQ才刪除消息;如果處理失敗,MQ可以把消息重新投遞給其他消費(fèi)者。避免消息在處理階段丟失。 - 事務(wù)消息(比如RocketMQ)
先發(fā)一個(gè)“半消息”,等本地事務(wù)執(zhí)行成功,再確認(rèn)投遞;如果失敗,就回滾。這適用于分布式事務(wù)場(chǎng)景。 - 消息去重
為了避免重復(fù)消費(fèi),可以在消費(fèi)端做冪等性設(shè)計(jì)。比如在數(shù)據(jù)庫(kù)里記錄消息ID,每次處理前先查一下是否已經(jīng)處理過(guò)。
實(shí)際中我們一般會(huì)結(jié)合業(yè)務(wù)來(lái)設(shè)計(jì)。
比如一個(gè)訂單狀態(tài)同步的場(chǎng)景,我可能會(huì)用:生產(chǎn)者確認(rèn) + 消息持久化 + 消費(fèi)者手動(dòng)ACK + 消費(fèi)端冪等性。這樣基本能覆蓋發(fā)送、存儲(chǔ)、消費(fèi)各個(gè)環(huán)節(jié)的可靠性。
當(dāng)然,可靠性和性能之間需要權(quán)衡,比如持久化會(huì)降低吞吐量,手動(dòng)ACK會(huì)增加延遲。所以要根據(jù)業(yè)務(wù)需求來(lái)選擇合適的方案。
追加:遇到過(guò)消息丟失或重復(fù)的問(wèn)題,你是怎么排查和解決的?
追加:是否了解最終一致性、最大努力通知等模式 ?
到此這篇關(guān)于Java 消息的可靠性投遞的文章就介紹到這了,更多相關(guān)java消息的可靠性投遞內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot啟動(dòng)類和啟動(dòng)過(guò)程超詳細(xì)講解
springboot的啟動(dòng)經(jīng)過(guò)了一些一系列的處理,下面這篇文章主要介紹了Springboot啟動(dòng)類和啟動(dòng)過(guò)程的相關(guān)資料,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2025-09-09
SpringBoot統(tǒng)一功能處理示例詳解(攔截器)
這篇文章主要介紹了SpringBoot統(tǒng)一功能處理(攔截器),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08
MyBatis-Plus不使用數(shù)據(jù)庫(kù)默認(rèn)值的問(wèn)題及解決
這篇文章主要介紹了MyBatis-Plus不使用數(shù)據(jù)庫(kù)默認(rèn)值的問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07
SpringBoot項(xiàng)目application.yml文件數(shù)據(jù)庫(kù)配置密碼加密的方法
這篇文章主要介紹了SpringBoot項(xiàng)目application.yml文件數(shù)據(jù)庫(kù)配置密碼加密的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-03-03
使用多個(gè)servlet時(shí)Spring security需要指明路由匹配策略問(wèn)題
這篇文章主要介紹了使用多個(gè)servlet時(shí)Spring security需要指明路由匹配策略問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08
解決java.lang.Error: Unresolved compilation pro
這篇文章主要介紹了解決java.lang.Error: Unresolved compilation problems:問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03

