Springboot結(jié)合rabbitmq實現(xiàn)的死信隊列
概述
RabbitMQ是流行的開源消息隊列系統(tǒng),使用erlang語言開發(fā)。為了保證訂單業(yè)務的消息數(shù)據(jù)不丟失,需要使用到RabbitMQ的死信隊列機制,當消息消費發(fā)生異常時,將消息投入死信隊列中。但由于對死信隊列的概念及配置不熟悉,導致曾一度陷入百度的汪洋大海,無法自拔,很多文章都看起來可行,但是實際上卻并不能幫我解決實際問題。最終,在官網(wǎng)文檔中找到了我想要的答案,通過官網(wǎng)文檔的學習,才發(fā)現(xiàn)對于死信隊列存在一些誤解,導致配置死信隊列之路困難重重。
詳細
一、運行效果


二、實現(xiàn)過程
①、先創(chuàng)建一個Springboot項目。然后在pom文件中添加 spring-boot-starter-amqp 和 spring-boot-starter-web 的依賴,接下來創(chuàng)建一個Config類,這里是關鍵:
package com.zyf.rabbitmqdeadletterdemo.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.dead.letter.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.dead.letter.business.queueA";
public static final String BUSINESS_QUEUEB_NAME = "rabbitmq.dead.letter.business.queueB";
public static final String DEAD_LETTER_EXCHANGE = "rabbitmq.dead.letter.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueA.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueB.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "rabbitmq.dead.letter.deadletter.queueA";
public static final String DEAD_LETTER_QUEUEB_NAME = "rabbitmq.dead.letter.deadletter.queueB";
// 聲明業(yè)務Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 聲明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 聲明業(yè)務隊列A
@Bean("businessQueueA")
public Queue businessQueueA(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當前隊列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 聲明業(yè)務隊列B
@Bean("businessQueueB")
public Queue businessQueueB(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當前隊列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
}
// 聲明死信隊列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 聲明死信隊列B
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
// 聲明業(yè)務隊列A綁定關系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 聲明業(yè)務隊列B綁定關系
@Bean
public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 聲明死信隊列A綁定關系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
// 聲明死信隊列B綁定關系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}②、接下來,是業(yè)務隊列的消費代碼:
@Slf4j@Componentpublic class BusinessMessageReceiver { @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到業(yè)務消息A:{}", msg); boolean ack = true;
Exception exception = null; try { if (msg.contains("deadletter")){ throw new RuntimeException("dead letter exception");
}
} catch (Exception e){
ack = false;
exception = e;
} if (!ack){
log.error("消息消費發(fā)生異常,error msg:{}", exception.getMessage(), exception);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到業(yè)務消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}③、然后配置死信隊列的消費者:
@Componentpublic class DeadLetterMessageReceiver { @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息A:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}④、為了方便測試,寫一個簡單的消息生產(chǎn)者,并通過controller層來生產(chǎn)消息。
@Componentpublic class BusinessMessageSender { @Autowired
private RabbitTemplate rabbitTemplate; public void sendMsg(String msg){
rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
}
}@RequestMapping("rabbitmq")@RestControllerpublic class RabbitMQMsgController { @Autowired
private BusinessMessageSender sender; @RequestMapping("sendmsg")
public void sendMsg(String msg){
sender.sendMsg(msg);
}
}三、項目結(jié)構(gòu)圖

四、補充總結(jié)
死信隊列其實并沒有什么神秘的地方,不過是綁定在死信交換機上的普通隊列,而死信交換機也只是一個普通的交換機,不過是用來專門處理死信的交換機。
總結(jié)一下死信消息的生命周期:
- 業(yè)務消息被投入業(yè)務隊列
- 消費者消費業(yè)務隊列的消息,由于處理過程中發(fā)生異常,于是進行了nck或者reject操作
- 被nck或reject的消息由RabbitMQ投遞到死信交換機中
- 死信交換機將消息投入相應的死信隊列
- 死信隊列的消費者消費死信消息
死信消息是RabbitMQ為我們做的一層保證,其實我們也可以不使用死信隊列,而是在消息消費異常時,將消息主動投遞到另一個交換機中,當你明白了這些之后,這些Exchange和Queue想怎樣配合就能怎么配合。比如從死信隊列拉取消息,然后發(fā)送郵件、短信、釘釘通知來通知開發(fā)人員關注?;蛘邔⑾⒅匦峦哆f到一個隊列然后設置過期時間,來進行延時消費。
到此這篇關于Springboot結(jié)合rabbitmq實現(xiàn)的死信隊列的文章就介紹到這了,更多相關Springboot rabbitmq死信隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ處理死信隊列和延遲隊列
- SpringBoot+RabbitMQ?實現(xiàn)死信隊列的示例
- 如何利用rabbitMq的死信隊列實現(xiàn)延時消息
- 深入分析RabbitMQ中死信隊列與死信交換機
- 關于SpringBoot整合RabbitMQ實現(xiàn)死信隊列
- 關于Rabbitmq死信隊列及延時隊列的實現(xiàn)
- RabbitMQ之死信隊列深入解析
- springboot中RabbitMQ死信隊列的實現(xiàn)示例
- SpringBoot整合RabbitMQ實現(xiàn)延遲隊列和死信隊列
- springboot整合RabbitMQ中死信隊列的實現(xiàn)
相關文章
Java中如何快速構(gòu)建項目腳手架的實現(xiàn)
這篇文章主要介紹了Java中如何快速構(gòu)建項目腳手架,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-05-05
java階乘計算獲得結(jié)果末尾0的個數(shù)代碼實現(xiàn)
今天偶然看到一個要求,求1000~10000之間的數(shù)n的階乘并計算所得的數(shù)n!末尾有多少個0?要求: 不計算 只要得到末尾有多少個0就可以了,看下面的代碼吧2013-12-12
Java創(chuàng)建對象(顯式創(chuàng)建和隱含創(chuàng)建)
本文詳細介紹對象的創(chuàng)建,在 Java 語言中創(chuàng)建對象分顯式創(chuàng)建與隱含創(chuàng)建兩種情況,顯式創(chuàng)建和隱含創(chuàng)建,,需要的朋友可以參考下面文章的具體內(nèi)容2021-09-09

