springcloud中RabbitMQ死信隊列與延遲交換機實現(xiàn)方法
0.引言
死信隊列是消息隊列中非常重要的概念,同時我們需要業(yè)務場景中都需要延遲發(fā)送的概念,比如12306中的30分鐘后未支付訂單取消。那么本期,我們就來講解死信隊列,以及如何通過延遲交換機來實現(xiàn)延遲發(fā)送的需求。
1. 死信隊列
1.2 什么是死信?
理解死信隊列前,我們先講解什么是死信,所謂死信就是沒有被成功消費的消息,但并不是所有未成功消費的消息都是死信消息,死信消息的產(chǎn)生來源于以下三種途徑: (1)消息被消費者拒絕,參數(shù)requeue設置為false的消息 (2)過期的消息,過期消息分為兩種: a. 發(fā)送消息時,設置了某一條消息的生存時間(message TTL),如果生存時間到了,消息還沒有被消費,就會被標注為死信消息 b. 設置了隊列的消息生存時間,針對隊列中所有的消息,如果生存時間到了,消息還沒有被消費,就會被標注為死信消息 (3)當隊列達到了最大長度后,再發(fā)送過來的消息就會直接變成死信消息
1.3 什么是死信隊列?
直接來講,用來盛裝死信的隊列就是死信隊列,好像是一句廢話,所以其重點在于理解死信的概念。
死信隊列的作用: (1)隊列在已滿的情況下,會將消息發(fā)送到死信隊列中,這樣消息就不會丟失了,回頭再從死信隊列里將消息取出來進行消費即可 (2)可以基于死信隊列實現(xiàn)延遲消費的效果。具體的實現(xiàn)我們后續(xù)講解
1.4 創(chuàng)建死信交換機、死信隊列
死信交換機、死信隊列其實都是普通的交換機、隊列,只是專門聲明出來用于存儲死信消息的。我們只需要通過deadLetterExchange方法來聲明死信交換機,然后用deadLetterRoutingKey方法來聲明死信隊列
如下代碼所示,我們創(chuàng)建了test.queue、test.exchange及dead.queue、dead.exchange,并且在test.queue中將死信交換機和死信路由指定到了測試隊列中
注意:涉及到修改隊列、交換機屬性的,如果該隊列、交換機已經(jīng)存在需要將其刪除后才能生效,否則可能還會報錯。
@Configuration
public class RabbitMqConfig {
private static final String TEST_EXCHANGE = "test.exchange";
private static final String TEST_QUEUE = "test.queue";
private static final String TEST_ROUTING_KEY = "test.routing.key";
private static final String DEAD_EXCHANGE = "dead.exchange";
private static final String DEAD_QUEUE = "dead.queue";
private static final String DEAD_ROUTING_KEY = "dead.routing.key";
@Bean
public Queue deadQueue(){
return new Queue(DEAD_QUEUE);
}
public DirectExchange deadExchange(){
// 設置演示,使用了直接交換機Direct,大家可以根據(jù)自己的業(yè)務情況聲明為其他類型的交換機
return new DirectExchange(DEAD_EXCHANGE);
public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
public Queue testQueue(){
return QueueBuilder.durable(TEST_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build();
public DirectExchange testExchange(){
return new DirectExchange(TEST_EXCHANGE);
public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
return BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTING_KEY);
}1.5 實現(xiàn)死信消息
1.5.1 基于消費者進行reject或nack實現(xiàn)死信消息
@Component
public class QueueListener {
@RabbitListener(queues = RabbitMqConfig.TEST_QUEUE)
public void handler(MyMessage messageInfo, Message message, Channel channel) {
try{
System.out.println("接收的消息:"+messageInfo.toString());
// requeue參數(shù)設置為false 設置死信消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
// multiple和requeue設置為false 設置死信消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
// 返回ack 確認接收到消息
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (IOException e){
try {
channel.basicRecover();
} catch (IOException ex) {
ex.printStackTrace();
log.error("消息處理失?。簕}",e.getMessage());
}
}
}
}1.5.2 基于生存時間實現(xiàn)
(1)發(fā)送消息時設置生存時間
@GetMapping("sendTestQueueWithExpiration")
public String sendTestQueueWithExpiration(){
MyMessage message = new MyMessage(1L,"物流提醒","到達裝貨區(qū)域,注意上傳憑證",new Date());
rabbitTemplate.convertAndSend(RabbitMqConfig.TEST_EXCHANGE,RabbitMqConfig.TEST_ROUTING_KEY, message,msg -> {
msg.getMessageProperties().setExpiration("5000");
return msg;
});
return "發(fā)送成功";
}(2)隊列設置生存時間
@Bean
public Queue testQueue(){
return QueueBuilder.durable(TEST_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTING_KEY)
// 10s 過期
.ttl(10000)
.build();
}1.5.3 基于隊列max_length實現(xiàn)
@Bean
public Queue testQueue(){
return QueueBuilder.durable(TEST_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTING_KEY)
// 容量最大100條
.maxLength(100)
.build();
}1.6 基于死信隊列實現(xiàn)消息延遲發(fā)送
上述我們說過死信隊列還可以消息延遲發(fā)送,其思路就是: (1)消息發(fā)送時設置消息的生存時間,其生存時間就是我們想要延遲的時間 (2)消息者監(jiān)控死信隊列進行消費
正常隊列的消息因為沒有消費者消費,同時又指定了生存時間,到達時間后消息轉發(fā)到死信隊列中,消費者監(jiān)聽了死信隊列從而將其消費掉。
基于死信隊列實現(xiàn)消息延遲發(fā)送的問題
如果有兩個消息,一個是5s生存時間,一個是10s生存時間,當我們先發(fā)送了10s生存時間的消息到queue中時,因為rabbitmq只會監(jiān)控隊列最外側的消息的生存時間,也就是監(jiān)控10s生存時間的消息,而5s生存時間的消息只會在最外側的10s消息到期后才會監(jiān)控,也就導致我實際需要5s生存的消息,實際需要10s才監(jiān)聽到了。
所以呢,基于死信隊列實現(xiàn)的延遲消息,只使用于延遲時間一致的消息。
為了適配更多的延遲場景,已經(jīng)更加簡單的實現(xiàn)延遲消息,我們引入了延遲交換機
2. 延遲交換機
延遲交換機并不是rabbitmq自帶的功能,而是要通過安裝延遲交換機插件delayed_message_exchange來實現(xiàn)
其插件的安裝我們之間已經(jīng)講解過,不再累敘,可以參考如下博文 springcloud:安裝rabbitmq并配置延遲隊列插件
通過延遲交換機實現(xiàn)的延遲消息,其重點主要在交換機上,隊列就是普通隊列,消息發(fā)送到交換機上后,會記錄消息的延遲時間,到達時間后才會發(fā)送到隊列中,這樣消費者通過監(jiān)控隊列,就能在指定時間獲取到消息
因此延遲交換機與普通交換機的實現(xiàn),只在創(chuàng)建交換機時,其他的操作與普通交換機無異,因此使用起來也很方便
創(chuàng)建延遲交換機,通過x-delayed-type屬性聲明交換機類型,可以是direct也可以是topic,具體支持4中交換機類型,如果不清楚的可以參考之前的博文
@Configuration
public class RabbitMqDelayConfig {
public static final String DELAY_EXCHANGE = "delay.exchange";
public static final String DELAY_QUEUE = "delay.queue";
public static final String DELAY_ROUTING_KEY = "delay.routing.key";
@Bean
public Exchange delayExchange(){
Map<String, Object> arguments = new HashMap<>(1);
arguments.put("x-delayed-type","direct");
return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments);
}
@Bean
public Queue delayQueue(){
return new Queue(DELAY_QUEUE);
}
@Bean
public Binding delayBinding(Queue delayQueue, Exchange delayExchange){
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
}
}發(fā)送消息時指定延遲時間,單位毫秒
rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(30000);
return message;
}
});我們還可以將該方法封裝為工具類方法,方便之后調(diào)用
/**
* 發(fā)送 延遲隊列
* @param exchange 交換機
* @param routeKey 路由
* @param message 消息
* @param delaySecond 延遲秒數(shù)
*/
public void send(String exchange, String routeKey, Object message, int delaySecond){
rabbitTemplate.convertAndSend(exchange,routeKey,message,msg -> {
// 消息持久化
msg.getMessageProperties().setDelay(delaySecond * 1000);
return msg;
});
}3. 應用場景
延遲消息的應用場景豐富,除了我們開篇所說的30分鐘未支付自動取消訂單,還比如到貨后72小時未簽收自動簽收
基本上所有需要延遲觸發(fā)的業(yè)務場景都可以用rabbitmq延遲隊列來實現(xiàn)。
4. 練習題
對于剛接觸rabbitmq的同學,這里我提供一個練習題給大家,也讓大家在實操中加強對于rabbitmq的理解:
需求:訂單到貨后72小時未簽收,自動簽收 講解:我們這里要實現(xiàn)訂單到貨后的自動簽收功能,訂單到貨后會觸發(fā)發(fā)送自動簽收消息的方法,訂單已簽收的狀態(tài)status為2,到貨狀態(tài)為1,如果72小時前已經(jīng)簽收了即status被更新為2了,那么需要取消自動簽收(不執(zhí)行自動簽收,即忽略自動簽收消息)
到此這篇關于springcloud:RabbitMQ死信隊列與延遲交換機實現(xiàn)的文章就介紹到這了,更多相關springcloud RabbitMQ死信隊列與延遲交換機內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringCloud超詳細講解微服務網(wǎng)關Zuul基礎
這篇文章主要介紹了SpringCloud?Zuul微服務網(wǎng)關,負載均衡,熔斷和限流,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-10-10
IDEA 中創(chuàng)建并部署 JavaWeb 程序的方法步驟(圖文)
本文主要介紹了IDEA 中創(chuàng)建并部署 JavaWeb 程序的方法步驟,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02
將SpringBoot項目無縫部署到Tomcat服務器的操作流程
SpringBoot 是一個用來簡化 Spring 應用初始搭建以及開發(fā)過程的框架,我們可以通過內(nèi)置的 Tomcat 容器來輕松地運行我們的應用,本文給大家介紹 SpringBoot 項目部署到獨立 Tomcat 服務器的操作流程,需要的朋友可以參考下2024-05-05
如何解決java.lang.IllegalStateException: Target host&n
文章描述了通過MocoRunner模擬接口,并使用properties文件和ResourceBundle讀取配置文件進行get請求的過程,在執(zhí)行過程中遇到了目標主機為空的錯誤,通過檢查和修正url拼接問題解決了該錯誤2024-12-12
IDEA Error:java: 無效的源發(fā)行版: 17錯誤
本文主要介紹了IDEA Error:java: 無效的源發(fā)行版: 17錯誤,這個錯誤是因為您的IDEA編譯器不支持Java 17版本,您需要更新您的IDEA編譯器或者將您的Java版本降級到IDEA支持的版本,本文就來詳細的介紹一下2023-08-08

