Springboot集成RabbitMQ死信隊列的實現(xiàn)
關(guān)于死信隊列
在大多數(shù)的MQ中間件中,都有死信隊列的概念。死信隊列同其他的隊列一樣都是普通的隊列。在RabbitMQ中并沒有特定的“死信隊列”類型,而是通過配置,將其實現(xiàn)。
當(dāng)我們在創(chuàng)建一個業(yè)務(wù)的交換機和隊列的時候,可以配置參數(shù),指明另一個隊列為當(dāng)前隊列的死信隊列,在RabbitMQ中,死信隊列(嚴(yán)格的說應(yīng)該是死信交換機)被稱為DLX Exchange。當(dāng)消息“死掉”后,會被自動路由到DLX Exchange的queue中。
什么樣的消息會進(jìn)入死信隊列?
1.消息的TTL過期。
2.消費者對broker應(yīng)答Nack,并且消息禁止重回隊列。
3.Queue隊列長度已達(dá)上限。
場景分析
以用戶訂單支付為場景。在各大電商平臺上,訂單的都有待支付時間,通常為30min。當(dāng)用戶超過30min未支付訂單,該訂單的狀態(tài)應(yīng)該會變成“超時取消”,或類似的狀態(tài)值的改變。
如果不使用MQ,可以設(shè)計一個定時任務(wù),定時查詢數(shù)據(jù)庫,判斷訂單的狀態(tài)和支付時間是否已經(jīng)到期,若到期則修改訂單的狀態(tài)。但顯然,這不是一個很好的操作,頻繁訪問數(shù)據(jù)庫,造成不必要的資源浪費。
使用MQ,我們可以在下單的時候,當(dāng)訂單數(shù)據(jù)入庫后,發(fā)送一條Message到Queue中,并設(shè)置過期時間為30min或自定義的支付過期時間。
/**
* 發(fā)送帶有過期時間的消息
*/
@GetMapping("/sendDlx")
public void sendDlx() {
Order order = new Order();
order.setItemId(1);
order.setStatus(1);
rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey,
JSON.toJSONString(order), message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 模擬,設(shè)置10S后消息過期
message.getMessageProperties().setExpiration("10000");
return message;
});
}
若30min后,還未有消費者(下游服務(wù))消費這條消息,那么該條消息就會被路由到死信隊列中。我們可以設(shè)置一個監(jiān)聽去監(jiān)聽死信隊列,當(dāng)收到死信隊列的消息后,則根據(jù)消息數(shù)據(jù),查詢數(shù)據(jù)庫訂單狀態(tài)是否還是待支付狀態(tài),若是,則修改成超時取消。
代碼實現(xiàn)
以下是demo,未做服務(wù)的拆分,因此整個流程都是單個服務(wù)實現(xiàn)的,所以就沒有下游服務(wù),但并不影響整體業(yè)務(wù)。
RabbitMQConfig
將需要的交換機,隊列,綁定都聲明成SpringBean。Spring會自動創(chuàng)建這些到RabbitMQ服務(wù)中。
@Value注解部分都是配置文件exchange、queue、routingKey的名稱。
/**
* @author wulei
*/
@Configuration
public class RabbitConfig {
@Value("${sunspring.order.exchange}")
private String orderExchange;
@Value("${sunspring.order.queue}")
private String orderQueue;
@Value("${sunspring.order.routingKey}")
private String orderRoutingKey;
@Value("${sunspring.dlx.exchange}")
private String dlxExchange;
@Value("${sunspring.dlx.queue}")
private String dlxQueue;
@Value("${sunspring.dlx.routingKey}")
private String dlxRoutingKey;
/**
* 聲明死信隊列
* @return DirectExchange
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(dlxExchange);
}
/**
* 聲明死信隊列
* @return Queue
*/
@Bean
public Queue dlxQueue() {
return new Queue(dlxQueue);
}
/**
* 綁定死信隊列到死信交換機
* @return Binding
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with(dlxRoutingKey);
}
/**
* 聲明訂單業(yè)務(wù)交換機
* @return DirectExchange
*/
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(orderExchange);
}
/**
* 聲明訂單業(yè)務(wù)隊列
* @return Queue
*/
@Bean
public Queue orderQueue() {
Map<String,Object> arguments = new HashMap<>(2);
// 綁定該隊列到私信交換機
arguments.put("x-dead-letter-exchange",dlxExchange);
arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
return new Queue(orderQueue,true,false,false,arguments);
}
/**
* 綁定訂單隊列到訂單交換機
* @return Binding
*/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with(orderRoutingKey);
}
}
sunspring.order.exchange=sunspring_order_exchange sunspring.order.queue=sunspring_order_queue sunspring.order.routingKey=sunspring.order sunspring.dlx.exchange=sunspring_dlx_exchange sunspring.dlx.queue=sunspring.dlx.queue sunspring.dlx.routingKey=dlx
在聲明業(yè)務(wù)隊列時,創(chuàng)建了一個Map,并且put了兩個值,這兩個值就是死信隊列的聲明。
x-dead-letter-exchange:死信交換機的名稱
x-dead-letter-routing-key:死信交換機的路由鍵,因為demo中兩個交換機的類型都是direct的,因此路由鍵必須相同。
/**
* 聲明訂單業(yè)務(wù)隊列
* @return Queue
*/
@Bean
public Queue orderQueue() {
Map<String,Object> arguments = new HashMap<>(2);
// 綁定該隊列到私信交換機
arguments.put("x-dead-letter-exchange",dlxExchange);
arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
return new Queue(orderQueue,true,false,false,arguments);
}
監(jiān)控頁面
在exchange列表中有剛剛創(chuàng)建的業(yè)務(wù)交換機sunspring_order_exchange和死信交換機
sunspring_dlx_exchange

在Queue列表中,有死信隊列sunspring_dlx_queue和業(yè)務(wù)隊列sunspring_order_queue
并且業(yè)務(wù)隊列上有DLX標(biāo)記,可見當(dāng)前隊列已經(jīng)綁定了一個死信隊列。DLK表示的路由鍵。

場景模擬
生產(chǎn)者
生產(chǎn)者發(fā)送了一個過期時間為10S的消息。
message.getMessageProperties().setExpiration(“10000”);
/**
* 發(fā)送帶有過期時間的消息
*/
@GetMapping("/sendDlx")
public void sendDlx() {
Order order = new Order();
order.setItemId(1);
order.setStatus(1);
rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey,
JSON.toJSONString(order), message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setExpiration("10000");
return message;
});
}
sunspring_order_queue接受到了一條消息,當(dāng)前消息的狀態(tài)是ready的,表示沒有任何消費者消費這條消息。

10s后,當(dāng)前消息路由到了死信隊列中,sunspring_order_queue消息數(shù)量變成0,sunspring_dlx_queue數(shù)量變成1。

消費者,設(shè)置死信隊列監(jiān)聽
通過設(shè)置對死信隊列的監(jiān)聽,可以發(fā)現(xiàn),在Springboot啟動之后,創(chuàng)建了對RabbitMQ的監(jiān)聽,死信隊列的消息也立刻被消費了。
因此,我們可以監(jiān)聽死信隊列,對未被消費的消息進(jìn)行下一步操作。如場景分析中的更改訂單狀態(tài)。
@RabbitListener(queues = "sunspring.dlx.queue")
public void dlxListener(Message message,Channel channel) throws IOException {
System.out.println(new String(message.getBody()));
//對消息進(jìn)行業(yè)務(wù)處理....
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
2019-08-20 20:05:05.158 INFO 4420 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [120.27.243.91:5672]
2019-08-20 20:05:05.224 INFO 4420 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#68ab0936:0/SimpleConnection@74606204 [delegate=amqp://guest@120.27.243.91:5672/, localPort= 13563]
{"itemId":1,"status":1}
到此這篇關(guān)于Springboot集成RabbitMQ死信隊列的實現(xiàn)的文章就介紹到這了,更多相關(guān)Springboot RabbitMQ死信隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合
- SpringAMQP消息隊列(SpringBoot集成RabbitMQ方式)
- 一文掌握Springboot集成RabbitMQ的方法
- springboot2.5.6集成RabbitMq實現(xiàn)Topic主題模式(推薦)
- SpringBoot集成RabbitMQ的方法(死信隊列)
- springboot2.0集成rabbitmq的示例代碼
- Spring Boot系列教程之7步集成RabbitMQ的方法
- springboot集成rabbitMQ之對象傳輸?shù)姆椒?/a>
- spring boot集成rabbitmq的實例教程
- 詳解spring boot集成RabbitMQ
- Spring Boot 3 集成 RabbitMQ 實踐指南(原理解析)
相關(guān)文章
springboot使用CommandLineRunner解決項目啟動時初始化資源的操作
這篇文章主要介紹了springboot使用CommandLineRunner解決項目啟動時初始化資源的操作,幫助大家更好的理解和學(xué)習(xí)使用springboot框架,感興趣的朋友可以了解下2021-02-02
簡談java并發(fā)FutureTask的實現(xiàn)
這篇文章主要介紹了簡談java并發(fā)FutureTask的實現(xiàn),FutureTask都是用于獲取線程執(zhí)行的返回結(jié)果。文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,,需要的朋友可以參考下2019-06-06
java實現(xiàn)大數(shù)加法(BigDecimal)的實例代碼
之前寫過用vector、string實現(xiàn)大數(shù)加法,現(xiàn)在用java的BigDecimal類,代碼簡單很多。但是在online-judge上,java的代碼運行時間和內(nèi)存大得多2013-10-10
Java 實戰(zhàn)項目錘煉之仿天貓網(wǎng)上商城的實現(xiàn)流程
讀萬卷書不如行萬里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+jsp+servlet+mysql+ajax實現(xiàn)一個仿天貓網(wǎng)上商城項目,大家可以在過程中查缺補漏,提升水平2021-11-11

