java中RabbitMQ高級應(yīng)用
1、消息可靠性投遞
在使用 RabbitMQ 的時(shí)候,生產(chǎn)者在進(jìn)行消息投遞的時(shí)候如果想知道消息是否成功的投遞到對應(yīng)的交換機(jī)和隊(duì)列中,有兩種方式可以用來控制消息投遞的可靠性模式 。

由上圖的整個消息的投遞過程來看,生產(chǎn)者的消息進(jìn)入到中間件中會首先到達(dá)交換機(jī),然后再從交換機(jī)傳遞到隊(duì)列中去,也就是分為兩步走戰(zhàn)略。那么消息的丟失情況也就是會出現(xiàn)在這兩個階段中,RabbitMQ 貼心的為我們提供了針對于這兩個部分的可靠新傳遞模式:
- confirm 模式。
- return 模式。
利用這兩個回調(diào)模式來確保消息的傳遞可靠。
1.1、確認(rèn)模式
消息從生產(chǎn)者到交換機(jī)之間傳遞會返回一個 confirmCallback 的回調(diào)??梢灾苯釉?rabbitTemplate 實(shí)例中進(jìn)行確認(rèn)邏輯的設(shè)置。如果是使用 XML 配置的話需要在工廠配置開啟 publisher-confirms="true",YAML 的配置就直接 publisher-confirm-type: correlated,他默認(rèn)是 NONE ,需要手動開啟。
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void producer() throws InterruptedException {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println();
if (!b) {
// 消息重發(fā)之類的處理
System.out.println(s);
} else {
System.out.println("交換機(jī)成功接收消息");
}
}
});
rabbitTemplate.convertAndSend("default_exchange", "default_queue",
"hello world & beordie");
TimeUnit.SECONDS.sleep(5);
}
} 上面的確認(rèn)是由一個 confirm 的函數(shù)執(zhí)行的,里面攜帶了三個參數(shù),第一個是配置的相關(guān)信息,第二個表示交換機(jī)是否成功的接收到消息,第三個參數(shù)是指沒有成功接收消息的原因。
1.2、退回模式
從交換機(jī)到消息隊(duì)列投遞失敗會返回一個 returnCallback 。在工廠配置中開啟回退模式 publisher-returns="true" ,設(shè)置交換機(jī)處理消息失敗的模式(默認(rèn) false 直接將消息進(jìn)行丟棄),添加退回處理的邏輯。
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void producer() throws InterruptedException {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 重發(fā)邏輯處理
System.out.println(message.getBody() + " 投遞消息隊(duì)列失敗");
}
});
rabbitTemplate.convertAndSend("default_exchange", "default_queue",
"hello world & beordie");
TimeUnit.SECONDS.sleep(5);
}
} returnedMessage 中攜帶五個參數(shù)、分別指的是消息對象、錯誤碼、錯誤信息、交換機(jī)、路由鍵。
1.3、確認(rèn)機(jī)制
在消費(fèi)者抓取消息隊(duì)列中的數(shù)據(jù)取消費(fèi)之后會有一個確認(rèn)機(jī)制進(jìn)行消息的確認(rèn),防止因?yàn)樽ト∠⒅蟮珱]有消費(fèi)成功而導(dǎo)致的消息丟失。有三種確認(rèn)方式:
自動確認(rèn):
acknowledge="none"手動確認(rèn):
acknowledge="manual"根據(jù)異常情況確認(rèn):
acknowledge="auto"
其中自動確認(rèn)是指一旦消息被消費(fèi)者抓取就自動默認(rèn)成功,并將消息從消息隊(duì)列中進(jìn)行移除,如果這個時(shí)候消費(fèi)端消費(fèi)出現(xiàn)問題,那么也會是默認(rèn)消息消費(fèi)成功,但是實(shí)際上是沒有消費(fèi)成功的,也就是當(dāng)前的消息丟失了。默認(rèn)的情況就是自動確認(rèn)機(jī)制。
如果設(shè)置手動確認(rèn)的方式,就需要在正常消費(fèi)消息之后進(jìn)行回調(diào)確認(rèn) channel.basicAck(),手動簽收。如果業(yè)務(wù)處理過程中發(fā)生了異常則調(diào)用 channel.basicNack() 重新發(fā)送消息。
首先需要在隊(duì)列綁定時(shí)進(jìn)行確認(rèn)機(jī)制的配置,設(shè)置為手動簽收。
<!-- 綁定隊(duì)列 -->
<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual">
<rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>
</rabbit:listener-container>生產(chǎn)者一端不用更改,只需要改變消費(fèi)者的實(shí)現(xiàn)進(jìn)行消息自動簽收就可以了,正常執(zhí)行業(yè)務(wù)則簽收消息,業(yè)務(wù)發(fā)生錯誤則選擇消息拒簽,消息重發(fā)或者丟棄。
public class ConsumerAck implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 消息唯一ID
long tag = message.getMessageProperties().getDeliveryTag();
try {
String msg = new String(message.getBody(), "utf-8");
channel.basicAck(tag, true);
System.out.println("接收消息: " + msg);
} catch (Exception e) {
System.out.println("接收消息異常");
channel.basicNack(tag, true, true);
e.printStackTrace();
}
}
} 里面涉及三個簡單的簽收函數(shù),一是正確簽收的 basicAck ,二是單條拒簽的 basicReject ,三是批量拒簽的 basicNack 。
- basicAck 第一個參數(shù)表示消息在通道中的唯一ID,只針對當(dāng)前的 Channel;第二個參數(shù)表示是否批量同意,如果是 false 的話只會同意簽收當(dāng)前ID的一條消息,將其從消息隊(duì)列中進(jìn)行刪除,而如果是 true 的話將會把此ID之前的消息一起給同意簽收了。
- basicReject 第一個參數(shù)依舊表示消息的唯一ID,第二個參數(shù)表示是否重新回隊(duì)發(fā)送,false 表示直接丟棄該條消息或者有死信隊(duì)列可以接收, true 則表示重新回隊(duì)進(jìn)行消息發(fā)送,所有操作只針對當(dāng)前的消息。
- basicNack 比第二個多了一個參數(shù),也就是處于中間位置的布爾值,表示是否批量進(jìn)行。
2、消費(fèi)端限流
在用戶請求和DB服務(wù)處理之間增加消息中間件的隔離,使得突發(fā)流量全部讓消息隊(duì)列來抗,降低服務(wù)端被沖垮的可能性。讓所有的請求都往隊(duì)列中存,消費(fèi)端只需要勻速的取出消息進(jìn)行消費(fèi),這樣就能保證運(yùn)行效率,也不會因?yàn)楹笈_的阻塞而導(dǎo)致客戶端得不到正常的響應(yīng)(當(dāng)然指的是一些不需要同步回顯的任務(wù))。

只需要在消費(fèi)者綁定消息隊(duì)列時(shí)指定取出消息的速率即可,需要使用手動簽收的方式,每進(jìn)行一次的簽收才會從隊(duì)列中再取出下一條數(shù)據(jù)。
<!-- 綁定隊(duì)列 -->
<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true"
acknowledge="manual" prefetch="1">
<rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>
</rabbit:listener-container>3、消息過期時(shí)間
消息隊(duì)列提供了存儲在隊(duì)列中消息的過期時(shí)間,分為兩個方向的實(shí)現(xiàn),一個是針對于整個隊(duì)列中的所有消息,也就是隊(duì)列的過期時(shí)間,另一個是針對當(dāng)前消息的過期時(shí)間,也就是針對于單條消息單獨(dú)設(shè)置。
隊(duì)列的過期時(shí)間設(shè)置很簡單,只需要在創(chuàng)建隊(duì)列時(shí)進(jìn)行過期時(shí)間的指定即可,也可以通過控制臺直接創(chuàng)建指定過期時(shí)間。一旦隊(duì)列過期時(shí)間到了,隊(duì)列中還未被消費(fèi)的消息都將過期,進(jìn)行隊(duì)列的過期處理。
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>單條消息的過期時(shí)間需要在發(fā)送的時(shí)候進(jìn)行單獨(dú)的指定,發(fā)送的時(shí)候指定配置的額外信息,配置的編寫由配置類完成。
如果一條消息的過期時(shí)間到了,但是他此時(shí)處于隊(duì)列的中間,那么他將不會被處理,只有當(dāng)之后處理到時(shí)候才會進(jìn)行判斷是否過期。
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws
AmqpException {
// 設(shè)置 message 的過期時(shí)間
message.getMessageProperties().setExpiration("5000");
// 返回該消息
return message;
}
};
rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);如果說同時(shí)設(shè)置了消息的過期時(shí)間和隊(duì)列的過期時(shí)間,那么最終的過期時(shí)間由最短的時(shí)間進(jìn)行決定,也就是說如果當(dāng)前消息的過期時(shí)間沒到,但是整個隊(duì)列的過期時(shí)間到了,那么隊(duì)列中的所有消息也自然就過期了,執(zhí)行過期的處理策略。
4、死信隊(duì)列
4.1、死信概念
死信隊(duì)列指的是死信交換機(jī),當(dāng)一條消息成為死信之后可以重新發(fā)送到另一個交換機(jī)進(jìn)行處理,而進(jìn)行處理的這個交換機(jī)就叫做死信交換機(jī)。

- 消息成為死信消息有幾種情況
隊(duì)列的消息長度達(dá)到限制
消費(fèi)者拒接消息的時(shí)候不把消息重新放入隊(duì)列中
隊(duì)列存在消息過期設(shè)置,消息超時(shí)未被消費(fèi)
消息存在過期時(shí)間,在投遞給消費(fèi)者時(shí)發(fā)現(xiàn)過期
在創(chuàng)建隊(duì)列時(shí)可以在配置中指定相關(guān)的信息,例如死信交換機(jī)、隊(duì)列長度等等,之后的一系列工作就不由程序員進(jìn)行操作了,MQ 會自己完成配置過的事件響應(yīng)。
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">
<rabbit:queue-arguments>
<!-- 死信交換機(jī) -->
<entry key="x-dead-letter-exchange" value-type="dlx_exchane"/>
<!-- 路由 -->
<entry key="x-dead-letter-routing-key" value-type="dlx_routing"/>
<!-- 隊(duì)列過期時(shí)間 -->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
<!-- 隊(duì)列長度 -->
<entry key="x-max-length" value-type="java.lang.Integer" value="10"/>
</rabbit:queue-arguments>
</rabbit:queue>4.2、延遲隊(duì)列
延遲隊(duì)列指的是消息在進(jìn)入隊(duì)列后不會立即被消費(fèi),只有到達(dá)指定時(shí)間之后才會被消費(fèi),也就是需要有一個時(shí)間的判斷條件。
消息隊(duì)列實(shí)際上是沒有提供對延遲隊(duì)列的實(shí)現(xiàn)的,但是可以通過 TTL + 死信隊(duì)列 的方式完成,設(shè)置一個隊(duì)列,不被任何的消費(fèi)者所消費(fèi),所有的消息進(jìn)入都會被保存在里面,設(shè)置隊(duì)列的過期時(shí)間,一旦隊(duì)列過期將所有的消息過渡到綁定的死信隊(duì)列中。
再由具體的消費(fèi)者來消費(fèi)死信隊(duì)列中的消息,這樣就實(shí)現(xiàn)了延遲隊(duì)列的功能。
例如實(shí)現(xiàn)一個下單超時(shí)支付取消訂單的功能:

到此這篇關(guān)于java中RabbitMQ高級應(yīng)用的文章就介紹到這了,更多相關(guān)java RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Jenkins安裝多個jdk版本并在項(xiàng)目中選擇對應(yīng)jdk版本
在使用jenkins構(gòu)建項(xiàng)目時(shí)會遇到不同的job需要配置不同版本的jdk,下面這篇文章主要給大家介紹了關(guān)于Jenkins安裝多個jdk版本并在項(xiàng)目中選擇對應(yīng)jdk版本的相關(guān)資料,需要的朋友可以參考下2024-03-03
Mybatis update數(shù)據(jù)庫死鎖之獲取數(shù)據(jù)庫連接池等待
這篇文章主要介紹了Mybatis update數(shù)據(jù)庫死鎖之獲取數(shù)據(jù)庫連接池等待的相關(guān)資料,非常不錯,具有參考借鑒價(jià)值,需要的朋友可以參考下2016-07-07
IDEA 打開java文件對應(yīng)的class路徑的操作步驟
這篇文章主要介紹了IDEA 打開java文件對應(yīng)的class路徑的操作步驟,本文分步驟給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10
詳談Servlet和Filter的區(qū)別以及兩者在Struts2和Springmvc中的應(yīng)用
下面小編就為大家?guī)硪黄斦凷ervlet和Filter的區(qū)別以及兩者在Struts2和Springmvc中的應(yīng)用。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-08-08
Spring Boot 與 Kotlin 使用JdbcTemplate連接MySQL數(shù)據(jù)庫的方法
本文介紹在Spring Boot基礎(chǔ)下配置數(shù)據(jù)源和通過 JdbcTemplate 編寫數(shù)據(jù)訪問的示例。感興趣的朋友跟隨腳本之家小編一起學(xué)習(xí)吧2018-01-01
解決idea中maven新增的配置文件xx.xml沒生效問題
這篇文章主要介紹了如何解決idea中maven新增的配置文件xx.xml沒生效問題,公司項(xiàng)目有用自己的`私服,Maven正常去私服下載jar包是沒問題的,但阿里云鏡像找不到相關(guān)的jar包報(bào)錯,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2024-06-06
Springboot?通過FastJson實(shí)現(xiàn)bean對象和Json字符串互轉(zhuǎn)問題
這篇文章主要介紹了Springboot?通過FastJson實(shí)現(xiàn)bean對象和Json字符串互轉(zhuǎn),本文嘗試驗(yàn)證兩種場景給大家詳細(xì)介紹,對Springboot?FastJson實(shí)現(xiàn)bean和Json互轉(zhuǎn)問題,感興趣的朋友一起看看吧2022-08-08

