RabbitMQ死信機(jī)制實(shí)現(xiàn)延遲隊(duì)列的實(shí)戰(zhàn)
延遲隊(duì)列
延遲隊(duì)列存儲(chǔ)的對(duì)象肯定是對(duì)應(yīng)的延時(shí)消息,所謂”延時(shí)消息”是指當(dāng)消息被發(fā)送以后,并不想讓消費(fèi)者立即拿到消息,而是等待指定時(shí)間后,消費(fèi)者才拿到這個(gè)消息進(jìn)行消費(fèi)。
應(yīng)用場(chǎng)景
三方支付,掃碼支付調(diào)用上游的掃碼接口,當(dāng)掃碼有效期過后去調(diào)用查詢接口查詢結(jié)果。實(shí)現(xiàn)方式:每當(dāng)一筆掃碼支付請(qǐng)求后,立即將此訂單號(hào)放入延遲隊(duì)列中(RabbitMQ),隊(duì)列過期時(shí)間為二維碼有效期,此隊(duì)列沒有設(shè)置消費(fèi)者,過了有效期后消息會(huì)重新路由到指定的的隊(duì)列,有消費(fèi)者去執(zhí)行訂單查詢。
RabbitMQ本身沒有直接支持延遲隊(duì)列功能,但是可以通過以下特性模擬出延遲隊(duì)列的功能。 但是我們可以通過RabbitMQ的兩個(gè)特性來曲線實(shí)現(xiàn)延遲隊(duì)列:Time To Live(TTL) 和 Dead Letter Exchanges(DLX)
Time To Live(TTL)
RabbitMQ可以針對(duì)Queue和Message設(shè)置 x-message-tt,來控制消息的生存時(shí)間,如果超時(shí),則消息變?yōu)閐ead letter(死信)RabbitMQ針對(duì)隊(duì)列中的消息過期時(shí)間有兩種方法可以設(shè)置。
A: 通過隊(duì)列屬性設(shè)置,隊(duì)列中所有消息都有相同的過期時(shí)間。
<!-- 將消息放入此隊(duì)列里,此隊(duì)列設(shè)置過期時(shí)間,不制造消費(fèi)者讓其過期,過期后變成死信,消息會(huì)放入指定的新隊(duì)列里,實(shí)現(xiàn)消息的延遲消費(fèi) -->
<rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" >
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">${qrcode.expire.icbc}</value>
</entry>
<!--消息過期根據(jù)重新路由 -->
<entry key="x-dead-letter-exchange" value="directExchange"/>
<entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/>
</rabbit:queue-arguments>
</rabbit:queue>
B: 對(duì)消息進(jìn)行單獨(dú)設(shè)置,每條消息TTL可以不同。
<!-- 將消息放入此隊(duì)列里,次隊(duì)列設(shè)置過期時(shí)間,不制造消費(fèi)者讓其過期,過期后變成死信,消息會(huì)放入指定的新隊(duì)列里,實(shí)現(xiàn)消息的延遲消費(fèi) -->
<rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" >
<rabbit:queue-arguments>
<!--消息過期根據(jù)重新路由 -->
<entry key="x-dead-letter-exchange" value="directExchange"/>
<entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/>
</rabbit:queue-arguments>
</rabbit:queue>
amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime));
package com.emax.paycenter.mq.pruducer;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
public class ExpirationMessagePostProcessor implements MessagePostProcessor {
private final Long ttl;
public ExpirationMessagePostProcessor(Long ttl) {
this.ttl = ttl;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(ttl.toString());
return message;
}
}
如果同時(shí)使用,則消息的過期時(shí)間以兩者之間TTL較小的那個(gè)數(shù)值為準(zhǔn)。消息在隊(duì)列的生存時(shí)間一旦超過設(shè)置的TTL值,就成為dead letter
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個(gè)參數(shù),如果隊(duì)列內(nèi)出現(xiàn)了dead letter,則按照這兩個(gè)參數(shù)重新路由。
x-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange
x-dead-letter-routing-key:指定routing-key發(fā)送
隊(duì)列出現(xiàn)dead letter的情況有:
消息或者隊(duì)列的TTL過期
隊(duì)列達(dá)到最大長度
消息被消費(fèi)端拒絕(basic.reject or basic.nack)并且requeue=false
利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信后,它能被重新publish到另一個(gè)Exchange。這時(shí)候消息就可以重新被消費(fèi)。

注意一:ttl設(shè)置之后,下次修改時(shí)間,會(huì)報(bào)錯(cuò),這時(shí)候,需要先刪除該隊(duì)列,重啟項(xiàng)目。否則會(huì)報(bào)錯(cuò)。
注意二:消費(fèi)者中,拋異常了沒處理,會(huì)一直重復(fù)消費(fèi)
注意三:下面的代碼我模擬了1-10號(hào)消息,消息的內(nèi)容里面是1-10。過期的時(shí)間是10-1秒。這里要注意,雖然10是第一個(gè)發(fā)送,但是它過期的時(shí)間最長。
過了10s以后,消費(fèi)者開始收到數(shù)據(jù),但是它是一次性收到如下結(jié)果:10、9 、8 、7 、6、5 、4 、3 、2 、1
Consumer第一個(gè)收到的還是10。10是第一個(gè)放進(jìn)隊(duì)列,但是它的過期時(shí)間最長。所以由此可見,即使一個(gè)消息比在同一隊(duì)列中的其他消息提前過期,提前過期的也不會(huì)優(yōu)先進(jìn)入死信隊(duì)列,它們還是按照入庫的順序讓消費(fèi)者消費(fèi)。如果第一進(jìn)去的消息過期時(shí)間是1小時(shí),那么死信隊(duì)列的消費(fèi)者也許等1小時(shí)才能收到第一個(gè)消息。參考官方文檔發(fā)現(xiàn)“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有當(dāng)過期的消息到了隊(duì)列的頂端(隊(duì)首),才會(huì)被真正的丟棄或者進(jìn)入死信隊(duì)列。
所以在考慮使用RabbitMQ來實(shí)現(xiàn)延遲任務(wù)隊(duì)列的時(shí)候,需要確保業(yè)務(wù)上每個(gè)任務(wù)的延遲時(shí)間是一致的。如果遇到不同的任務(wù)類型需要不同的延時(shí)的話,需要為每一種不同延遲時(shí)間的消息建立單獨(dú)的消息隊(duì)列。(也可以考慮緩存隊(duì)列,DelayQueue實(shí)現(xiàn)定時(shí)延遲執(zhí)行任務(wù),但是也有缺點(diǎn):就是項(xiàng)目重啟緩存里的數(shù)據(jù)就會(huì)丟失,DelayQueue的使用詳見其他博文)
for(int i = 10; i>0; i-- ){
amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime));
}
到此這篇關(guān)于RabbitMQ死信機(jī)制實(shí)現(xiàn)延遲隊(duì)列的實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)RabbitMQ 延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- RabbitMQ 實(shí)現(xiàn)延遲隊(duì)列的兩種方式詳解
- 手把手帶你掌握SpringBoot RabbitMQ延遲隊(duì)列
- 如何通過Python實(shí)現(xiàn)RabbitMQ延遲隊(duì)列
- RabbitMQ延遲隊(duì)列及消息延遲推送實(shí)現(xiàn)詳解
- Rabbitmq延遲隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù)的方法
- Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列的示例
- C#實(shí)現(xiàn)rabbitmq 延遲隊(duì)列功能實(shí)例代碼
- rabbitmq延遲隊(duì)列的使用方式
相關(guān)文章
SpringBoot?對(duì)接飛書多維表格事件回調(diào)監(jiān)聽流程分析
本文介紹了如何通過飛書事件訂閱機(jī)制和SpringBoot項(xiàng)目集成,對(duì)多維表數(shù)據(jù)的記錄變更進(jìn)行對(duì)接的詳細(xì)流程,包括如何創(chuàng)建應(yīng)用、配置參數(shù)、編寫訂閱代碼、訂閱文檔事件以及在SpringBoot工程中集成的步驟,感興趣的朋友跟隨小編一起看看吧2024-12-12
關(guān)于MyBatis中Mapper?XML熱加載優(yōu)化
大家好,本篇文章主要講的是關(guān)于MyBatis中Mapper?XML熱加載優(yōu)化,感興趣的同學(xué)趕快來看一看吧,對(duì)你有幫助的話記得收藏一下2022-01-01
Java項(xiàng)目防止SQL注入的幾種方法總結(jié)
SQL注入是比較常見的網(wǎng)絡(luò)攻擊方式之一,在客戶端在向服務(wù)器發(fā)送請(qǐng)求的時(shí)候,sql命令通過表單提交或者url字符串拼接傳遞到后臺(tái)持久層,最終達(dá)到欺騙服務(wù)器執(zhí)行惡意的SQL命令,下面這篇文章主要給大家總結(jié)介紹了關(guān)于Java項(xiàng)目防止SQL注入的幾種方法,需要的朋友可以參考下2023-04-04
Netty學(xué)習(xí)之理解selector原理示例
這篇文章主要為大家介紹了Netty學(xué)習(xí)之理解selector原理示例使用分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪<BR>2023-07-07
springboot使用redisTemplate操作lua腳本
本文主要介紹了springboot使用redisTemplate操作lua腳本,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08
Log4j2?重大漏洞編譯好的log4j-2.15.0.jar包下載(替換過程)
Apache?開源項(xiàng)目?Log4j?的遠(yuǎn)程代碼執(zhí)行漏洞細(xì)節(jié)被公開,由于?Log4j?的廣泛使用,該漏洞一旦被攻擊者利用會(huì)造成嚴(yán)重危害,下面小編給大家?guī)砹薒og4j2?重大漏洞編譯好的log4j-2.15.0.jar包下載,感興趣的朋友一起看看吧2021-12-12

