Spring Boot系列教程之死信隊列詳解
前言
在說死信隊列之前,我們先介紹下為什么需要用死信隊列。
如果想直接了解死信對接,直接跳入下文的"死信隊列"部分即可。
ack機制和requeue-rejected屬性
我們還是基于上篇《Spring Boot系列——7步集成RabbitMQ》的demo代碼來說。
在項目springboot-demo我們看到application.yaml文件部分配置內(nèi)容如下
... listener: type: simple simple: acknowledge-mode: auto concurrency: 5 default-requeue-rejected: true max-concurrency: 100 ...
其中
acknowledge-mode
該配置項是用來表示消息確認方式,其有三種配置方式,分別是none、manual和auto。
none意味著沒有任何的應(yīng)答會被發(fā)送。
manual意味著監(jiān)聽者必須通過調(diào)用Channel.basicAck()來告知所有的消息。
auto意味著容器會自動應(yīng)答,除非MessageListener拋出異常,這是默認配置方式。
default-requeue-rejected
該配置項是決定由于監(jiān)聽器拋出異常而拒絕的消息是否被重新放回隊列。默認值為true。
我一開始對于這個屬性有個誤解,我以為rejected是表示拒絕,所以將requeue-rejected連起來是拒絕重新放回隊列,后來查了資料明白這個屬性的功能才想起來rejected是個形容詞,其表示的應(yīng)該是被拒絕的消息
所以如果該屬性配置為true表示會重新放回隊列,如果配置為false表示不會放回隊列。
下面我們看看acknowledge-mode參數(shù)和default-requeue-rejected參數(shù)使用不同的組合方式,RabbitMQ是如何處理消息的。
代碼依然使用springboot-demo中的RabbitApplicationTests發(fā)送消息,使用Receiver類監(jiān)聽demo-queue隊列的消息。
對于Receiver類添加了一行代碼,該代碼模擬拋出異常
@Component
public class Receiver {
@RabbitListener(queues = "demo_queue")
public void created(String message) {
System.out.println("orignal message: " + message);
int i = 1/0;
}
}
acknowledge-mode=none, default-requeue-rejected=false

該配置不會確認消息是否正常消費,所以在控制臺沒有拋出任何異常。通過在RabbitMQ管理頁面也沒有看到重新放回隊列的消息
acknowledge-mode=none, default-requeue-rejected=true

同樣該配置不會確認消息是否正常消費,所以在控制臺沒有拋出任何異常。而且即使default-requeue-rejected配置為true因為沒有確認所以也沒有看到重新放回隊列的消息
acknowledge-mode=manual, default-requeue-rejected=false

該配置需要手動確認消息是否正常消費,但是代碼中并沒有手動確認,個人理解是因為沒有收到ack,所以消息又回到了隊列中。
acknowledge-mode=manual, default-requeue-rejected=true

該配置需要手動確認消息是否正常消費,但是代碼中并沒有手動確認,所以消息被重新放入到隊列中了,并且在控制臺發(fā)現(xiàn)還拋出了異常(這塊不是很清楚,default-requeue-rejected設(shè)置true和false帶來的不同效果,有了解的麻煩下方留言指教)。
acknowledge-mode=auto, default-requeue-rejected=false

該配置采用自動確認,從結(jié)果來看,是自動確認了。
從控制臺打印的結(jié)果可以看出Receiver方法執(zhí)行了3次,分別是前面兩條放回隊列的消息以及這次發(fā)送的消息,所以3條消息都消費了。
同時因為default-requeue-rejected設(shè)置為false,所以即使消費拋出異常,也沒有將消息放回隊列。
acknowledge-mode=auto, default-requeue-rejected=true

該配置同樣采用自動確認,從結(jié)果看出,沒有拋出異常(這塊也不是很理解),且因為default-requeue-rejected設(shè)置為true,所以消息重新回到隊列。
綜上羅列這么多情況只為說明有些情況下,如果消息消費出錯,因為配置問題導致消息丟失了。這在很多情況下是要命的,比如用戶支付的訂單號,如果因為拋異常等原因直接丟失是很要命的。
所以,我們需要有一個確保機制,能夠保證即使失敗的消息也能保存下來,這時候死信隊列就排上用場了。
死信隊列
死信隊列的整個設(shè)計思路是這樣的
生產(chǎn)者 --> 消息 --> 交換機 --> 隊列 --> 變成死信 --> DLX交換機 -->隊列 --> 消費者
下面我們通過網(wǎng)上的一個簡單的死信隊列的實現(xiàn)看看如何使用死信隊列。
@Bean("deadLetterExchange")
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
}
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 聲明 死信交換機
args.put("x-dead-letter-exchange", "DL_EXCHANGE");
// x-dead-letter-routing-key 聲明 死信路由鍵
args.put("x-dead-letter-routing-key", "KEY_R");
return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
}
@Bean("redirectQueue")
public Queue redirectQueue() {
return QueueBuilder.durable("REDIRECT_QUEUE").build();
}
/**
* 死信路由通過 DL_KEY 綁定鍵綁定到死信隊列上.
*
* @return the binding
*/
@Bean
public Binding deadLetterBinding() {
return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);
}
/**
* 死信路由通過 KEY_R 綁定鍵綁定到死信隊列上.
*
* @return the binding
*/
@Bean
public Binding redirectBinding() {
return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
}
注意
聲明了一個direct模式的exchange。
聲明了一個死信隊列deadLetterQueue,該隊列配置了一些屬性x-dead-letter-exchange表明死信交換機,x-dead-letter-routing-key表明死信路由鍵,因為是direct模式,所以需要設(shè)置這個路由鍵。
聲明了一個替補隊列redirectQueue,變成死信的消息最終就是存放在這個隊列的。
聲明綁定關(guān)系,分別是死信隊列以及替補隊列和交換機的綁定。
那么如何模擬生成一個死信消息呢,可以在發(fā)送到DL_QUEUE的消息在10秒后失效,然后轉(zhuǎn)發(fā)到替補隊列中,代碼實現(xiàn)如下
public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 設(shè)置編碼
messageProperties.setContentEncoding("utf-8");
// 設(shè)置過期時間10*1000毫秒
messageProperties.setExpiration("5000");
return message;
};
rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", content, messagePostProcessor);
}
執(zhí)行結(jié)果如下

消息首先進入DL_QUEUE,5秒后失效,被轉(zhuǎn)發(fā)到REDIRECT_QUEUE中。
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。
相關(guān)文章
Java連接SAP RFC實現(xiàn)數(shù)據(jù)抽取的示例詳解
這篇文章主要為大家學習介紹了Java如何連接SAP RFC實現(xiàn)數(shù)據(jù)抽取的功能,文中的示例代碼講解詳細,具有一定的參考價值,需要的可以了解下2023-08-08
Eclipse下使用ANT編譯提示OutOfMemory的解決方法
由于需要使用ANT編譯的代碼比較多,特別是在第一次變異的時候,會出現(xiàn)OutOfMemory錯誤。并提示更改ANT_OPTS設(shè)定。2009-04-04
在 Spring Boot 項目中實現(xiàn)文件下載功能
這篇文章主要介紹了在 Spring Boot 項目中實現(xiàn)文件下載功能,本文給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-09-09
java volatile關(guān)鍵字作用及使用場景詳解
在本文里我們給大家分享的是關(guān)于java volatile關(guān)鍵字作用及使用場景的相關(guān)知識點內(nèi)容,需要的朋友們學習下。2019-08-08
詳解Spring 框架中切入點 pointcut 表達式的常用寫法
這篇文章主要介紹了詳解Spring 框架中切入點 pointcut 表達式的常用寫法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-04-04

