SpringBoot4.0整合RabbitMQ死信隊(duì)列詳解
為啥那么講解死信隊(duì)列,因?yàn)楹枚嗳瞬粫?huì)使用,不知道什么場(chǎng)景下使用,此案例是我在公司實(shí)現(xiàn)的一種方式,讓大家都可以學(xué)習(xí)到
一、死信隊(duì)列的好處
1.提高系統(tǒng)可靠性
- 避免消息丟失,確保處理失敗的消息有備份
- 防止因消息處理異常導(dǎo)致的消息無限重試
2.異常消息管理
- 將異常消息與正常消息分離
- 便于監(jiān)控和排查問題消息
3.靈活的重試機(jī)制
- 支持延遲重試
- 可設(shè)置不同的重試策略
4.系統(tǒng)解耦
- 業(yè)務(wù)邏輯與異常處理邏輯分離
- 提高代碼的可維護(hù)性
二、注解式配置說明
1.主配置注解
@Configuration
public class RabbitMQConfig {
// 主隊(duì)列
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.deadLetterExchange("dlx.exchange") // 死信交換器
.deadLetterRoutingKey("dlx.routing.key") // 死信路由鍵
.ttl(10000) // 消息10秒未消費(fèi)進(jìn)入死信
.maxLength(1000) // 隊(duì)列最大長(zhǎng)度
.build();
}
// 死信隊(duì)列
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dl.queue")
.build();
}
// 死信交換器
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dlx.exchange");
}
// 綁定死信交換器和隊(duì)列
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dlx.routing.key");
}
}
2.監(jiān)聽器注解
@Component
public class OrderMessageListener {
// 監(jiān)聽正常隊(duì)列
@RabbitListener(queues = "order.queue")
public void processOrderMessage(OrderDTO order,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
// 業(yè)務(wù)處理邏輯
if (processOrder(order)) {
// 手動(dòng)確認(rèn)
channel.basicAck(tag, false);
} else {
// 拒絕消息,進(jìn)入死信隊(duì)列
channel.basicNack(tag, false, false);
}
} catch (Exception e) {
// 異常時(shí)拒絕
channel.basicNack(tag, false, false);
}
}
// 監(jiān)聽死信隊(duì)列
@RabbitListener(queues = "dl.queue")
public void processDeadLetter(OrderDTO order) {
log.error("收到死信消息: {}", order);
// 死信消息處理邏輯
handleDeadLetter(order);
}
}
三、詳細(xì)整合步驟
1.添加依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置屬性
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 開啟消息返回機(jī)制
publisher-returns: true
# 開啟確認(rèn)機(jī)制
publisher-confirm-type: correlated
listener:
simple:
# 手動(dòng)確認(rèn)
acknowledge-mode: manual
# 重試配置
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
3.完整配置類
@Configuration
@Slf4j
public class RabbitMQFullConfig {
// ========== 正常業(yè)務(wù)隊(duì)列配置 ==========
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange", true, false);
}
@Bean
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>();
// 死信交換器
args.put("x-dead-letter-exchange", "order.dlx.exchange");
// 死信路由鍵
args.put("x-dead-letter-routing-key", "order.dlx.key");
// 消息TTL(毫秒)
args.put("x-message-ttl", 30000);
// 隊(duì)列最大長(zhǎng)度
args.put("x-max-length", 10000);
return QueueBuilder.durable("order.queue")
.withArguments(args)
.build();
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with("order.key");
}
// ========== 死信隊(duì)列配置 ==========
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("order.dlx.exchange", true, false);
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("order.dl.queue")
.build();
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("order.dlx.key");
}
// ========== 重試隊(duì)列(延時(shí)隊(duì)列替代方案)==========
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delay.exchange",
"x-delayed-message", true, false, args);
}
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay.queue")
.build();
}
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue())
.to(delayExchange())
.with("delay.key")
.noargs();
}
}
4.消息生產(chǎn)者
@Component
@Slf4j
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
// 發(fā)送普通消息
public void sendOrderMessage(OrderDTO order) {
CorrelationData correlationData = new CorrelationData(order.getId());
rabbitTemplate.convertAndSend(
"order.exchange",
"order.key",
order,
message -> {
// 設(shè)置消息屬性
message.getMessageProperties()
.setExpiration("30000") // 消息TTL
.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
correlationData
);
// 確認(rèn)回調(diào)
correlationData.getFuture().addCallback(
result -> {
if (result.isAck()) {
log.info("消息發(fā)送成功: {}", order.getId());
}
},
ex -> log.error("消息發(fā)送失敗: {}", ex.getMessage())
);
}
// 發(fā)送延遲消息
public void sendDelayMessage(OrderDTO order, int delayTime) {
rabbitTemplate.convertAndSend(
"delay.exchange",
"delay.key",
order,
message -> {
message.getMessageProperties()
.setHeader("x-delay", delayTime);
return message;
}
);
}
}
5.消息消費(fèi)者(完整版)
@Component
@Slf4j
public class OrderMessageConsumer {
private static final int MAX_RETRY_COUNT = 3;
@Autowired
private MessageProducer messageProducer;
/**
* 監(jiān)聽訂單隊(duì)列
*/
@RabbitListener(queues = "order.queue")
public void handleOrderMessage(
@Payload OrderDTO order,
@Headers Map<String, Object> headers,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
log.info("收到訂單消息: {}", order);
// 模擬業(yè)務(wù)處理
boolean success = processOrderBusiness(order);
if (success) {
// 業(yè)務(wù)成功,確認(rèn)消息
channel.basicAck(deliveryTag, false);
log.info("訂單處理成功: {}", order.getId());
} else {
// 獲取重試次數(shù)
Integer retryCount = (Integer) headers.get("x-retry-count");
retryCount = (retryCount == null) ? 1 : retryCount + 1;
if (retryCount <= MAX_RETRY_COUNT) {
// 重試次數(shù)未超限,重新入隊(duì)
log.warn("訂單處理失敗,第{}次重試: {}", retryCount, order.getId());
// 設(shè)置重試計(jì)數(shù)
headers.put("x-retry-count", retryCount);
// 延遲重試
messageProducer.sendDelayMessage(order, 5000);
// 確認(rèn)消息,避免重新投遞
channel.basicAck(deliveryTag, false);
} else {
// 超過重試次數(shù),進(jìn)入死信隊(duì)列
log.error("訂單處理失敗次數(shù)超過上限,進(jìn)入死信隊(duì)列: {}", order.getId());
channel.basicNack(deliveryTag, false, false);
}
}
} catch (Exception e) {
log.error("處理訂單消息異常: {}", e.getMessage());
try {
// 拒絕消息,進(jìn)入死信隊(duì)列
channel.basicNack(deliveryTag, false, false);
} catch (IOException ex) {
log.error("拒絕消息失敗: {}", ex.getMessage());
}
}
}
/**
* 監(jiān)聽死信隊(duì)列
*/
@RabbitListener(queues = "order.dl.queue")
public void handleDeadLetterMessage(
@Payload OrderDTO order,
@Headers Map<String, Object> headers) {
log.error("收到死信消息: {}", order);
// 記錄死信消息
logDeadLetter(order, headers);
// 發(fā)送告警
sendAlert(order);
// 人工處理或其他補(bǔ)償措施
manualProcess(order);
}
/**
* 監(jiān)聽延遲隊(duì)列
*/
@RabbitListener(queues = "delay.queue")
public void handleDelayMessage(@Payload OrderDTO order) {
log.info("收到延遲消息,開始重試: {}", order);
// 重新發(fā)送到訂單隊(duì)列
messageProducer.sendOrderMessage(order);
}
private boolean processOrderBusiness(OrderDTO order) {
// 業(yè)務(wù)處理邏輯
// 返回true表示成功,false表示失敗
return new Random().nextBoolean();
}
private void logDeadLetter(OrderDTO order, Map<String, Object> headers) {
// 記錄死信日志
log.info("記錄死信: {}, headers: {}", order, headers);
}
private void sendAlert(OrderDTO order) {
// 發(fā)送告警通知
log.warn("發(fā)送告警: 訂單{}處理失敗", order.getId());
}
private void manualProcess(OrderDTO order) {
// 人工處理邏輯
log.info("等待人工處理訂單: {}", order.getId());
}
}
四、使用場(chǎng)景
1.訂單超時(shí)取消
// 訂單創(chuàng)建時(shí)發(fā)送延遲消息
public void createOrder(OrderDTO order) {
// 保存訂單
orderService.save(order);
// 發(fā)送30分鐘過期的消息
rabbitTemplate.convertAndSend(
"order.exchange",
"order.key",
order,
message -> {
message.getMessageProperties()
.setExpiration("1800000"); // 30分鐘
return message;
}
);
}
2.支付回調(diào)重試
// 支付回調(diào)失敗時(shí)進(jìn)入死信隊(duì)列,人工處理
@RabbitListener(queues = "payment.callback.queue")
public void handlePaymentCallback(PaymentDTO payment) {
if (!paymentService.processCallback(payment)) {
throw new RuntimeException("支付回調(diào)處理失敗");
}
}
3.庫存鎖定與釋放
// 庫存鎖定15分鐘后自動(dòng)釋放
public void lockInventory(String orderId) {
inventoryService.lock(orderId);
// 發(fā)送15分鐘后到期的消息
rabbitTemplate.convertAndSend(
"inventory.exchange",
"inventory.lock.key",
orderId,
message -> {
message.getMessageProperties()
.setExpiration("900000"); // 15分鐘
return message;
}
);
}
4.消息重試機(jī)制
// 分級(jí)重試策略
public class RetryStrategy {
// 第一次重試:5秒后
// 第二次重試:30秒后
// 第三次重試:5分鐘后
// 超過3次進(jìn)入死信隊(duì)列
}
五、優(yōu)點(diǎn)總結(jié)
- 可靠性:確保消息不丟失,即使處理失敗也有備份
- 靈活性:支持多種死信策略(超時(shí)、長(zhǎng)度限制、拒絕等)
- 可維護(hù)性:異常處理與正常業(yè)務(wù)邏輯分離
- 監(jiān)控性:死信隊(duì)列便于監(jiān)控和統(tǒng)計(jì)異常消息
- 可擴(kuò)展性:支持多種重試和補(bǔ)償機(jī)制
六、最佳實(shí)踐建議
- 合理設(shè)置TTL:根據(jù)業(yè)務(wù)需求設(shè)置合適的過期時(shí)間
- 監(jiān)控死信隊(duì)列:設(shè)置告警,及時(shí)處理死信消息
- 限制隊(duì)列大小:防止消息積壓
- 記錄詳細(xì)日志:便于問題排查
- 死信消息分析:定期分析死信原因,優(yōu)化系統(tǒng)
通過Spring Boot整合RabbitMQ死信隊(duì)列,可以構(gòu)建更加健壯、可靠的消息驅(qū)動(dòng)系統(tǒng),有效處理各種異常場(chǎng)景,提高系統(tǒng)的整體穩(wěn)定性。
到此這篇關(guān)于SpringBoot4.0整合RabbitMQ死信隊(duì)列詳解的文章就介紹到這了,更多相關(guān)SpringBoot RabbitMQ死信隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot使用RabbitMq延遲隊(duì)列和死信隊(duì)列詳解
- Springboot使用Rabbitmq的延時(shí)隊(duì)列+死信隊(duì)列實(shí)現(xiàn)消息延期消費(fèi)
- springboot整合RabbitMQ中死信隊(duì)列的實(shí)現(xiàn)
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列
- springboot中RabbitMQ死信隊(duì)列的實(shí)現(xiàn)示例
- Springboot結(jié)合rabbitmq實(shí)現(xiàn)的死信隊(duì)列
- 關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)死信隊(duì)列
- SpringBoot+RabbitMQ?實(shí)現(xiàn)死信隊(duì)列的示例
- SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列
- Springboot集成RabbitMQ死信隊(duì)列的實(shí)現(xiàn)
- SpringBoot集成RabbitMQ的方法(死信隊(duì)列)
相關(guān)文章
java去除中文括號(hào)小括號(hào),或者英文括號(hào)的實(shí)例代碼
這篇文章主要介紹了java去除中文括號(hào)小括號(hào),或者英文括號(hào)的實(shí)例代碼,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-09-09
Java實(shí)現(xiàn)mysql數(shù)據(jù)庫的自動(dòng)備份和自動(dòng)還原
這篇文章主要為大家詳細(xì)介紹了如何通過Java實(shí)現(xiàn)mysql數(shù)據(jù)庫的自動(dòng)備份和自動(dòng)還原,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解下2024-11-11
redis scan命令導(dǎo)致redis連接耗盡,線程上鎖的解決
這篇文章主要介紹了redis scan命令導(dǎo)致redis連接耗盡,線程上鎖的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-11-11
Java+MySql圖片數(shù)據(jù)保存與讀取的具體實(shí)例
之前一直沒有做過涉及到圖片存儲(chǔ)的應(yīng)用,最近要做的東東涉及到了這個(gè)點(diǎn),就做了一個(gè)小的例子算是對(duì)圖片存儲(chǔ)的初試吧2013-06-06
IDEA項(xiàng)目中配置Maven鏡像源(下載源)的詳細(xì)過程
Maven是一個(gè)能使我們的java程序開發(fā)節(jié)省時(shí)間和精力,是開發(fā)變得相對(duì)簡(jiǎn)單,還能使開發(fā)規(guī)范化的工具,下面這篇文章主要給大家介紹了關(guān)于IDEA項(xiàng)目中配置Maven鏡像源(下載源)的詳細(xì)過程,需要的朋友可以參考下2024-02-02
Java實(shí)戰(zhàn)項(xiàng)目之斗地主和斗牛游戲的實(shí)現(xiàn)
讀萬卷書不如行萬里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用Java實(shí)現(xiàn)一個(gè)斗地主和一個(gè)斗牛游戲,大家可以在過程中查缺補(bǔ)漏,提升水平2021-11-11
java使用軟引用實(shí)現(xiàn)緩存機(jī)制示例
這篇文章主要為大家介紹了java使用軟引用實(shí)現(xiàn)緩存機(jī)制示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08

