詳解RabbitMQ核心機制
1. MQ
1.1 MQ 概述
MQ,消息隊列,一種在分布式系統(tǒng)中用于通信的關(guān)鍵組件
本質(zhì)上是一個隊列,遵循 FIFO(先入先出)原則,隊列中存儲的內(nèi)容是消息(message)
消息可以非常簡單,比如只包含文本字符串或 JSON 數(shù)據(jù),也可以很復(fù)雜,如內(nèi)嵌對象。MQ 主要用于分布式系統(tǒng)之間的通信,解決數(shù)據(jù)傳遞的效率和可靠性問題
1.2 系統(tǒng)間通信方式
在分布式系統(tǒng)中,系統(tǒng)之間的調(diào)用通常有兩種方式:
1.同步通信:
直接調(diào)用對方的服務(wù),數(shù)據(jù)從一端發(fā)出后立即到達另一端。這種方式響應(yīng)快,但可能導(dǎo)致調(diào)用方阻塞,尤其在處理耗時操作時效率低下
2.異步通信:
數(shù)據(jù)從一端發(fā)出后,先進入一個容器進行臨時存儲,當滿足特定條件(如接收方準備好)時,再由容器轉(zhuǎn)發(fā)給另一端
MQ 就是這個容器的具體實現(xiàn),它解耦了發(fā)送方和接收方,提高了系統(tǒng)的靈活性和可擴展性
1.3 MQ的作用
MQ的核心工作是接收、存儲和轉(zhuǎn)發(fā)消息
1.3.1.異步解耦:
一些耗時操作(如發(fā)送注冊短信或郵件通知)不需要即時返回結(jié)果。MQ 可以將這些操作異步化。例如,用戶注冊后,系統(tǒng)立即返回注冊成功消息,同時將通知任務(wù)放入 MQ;MQ 在后臺異步處理通知,避免了用戶等待
這降低了系統(tǒng)耦合度,提升響應(yīng)速度。
1.3.2.流量削峰:
面對突發(fā)流量(如秒殺或促銷活動),系統(tǒng)可能因過載而崩潰,MQ 能緩沖請求,將峰值流量排隊處理
例如,在高并發(fā)場景下,請求先進入 MQ 隊列,系統(tǒng)根據(jù)自身處理能力逐步消費消息,防止資源耗盡
這避免了為處理峰值而過度投資資源,優(yōu)化了成本效率。
1.3.3.消息分發(fā):
當多個系統(tǒng)需要對同一數(shù)據(jù)做出響應(yīng)時,MQ 可實現(xiàn)高效的消息分發(fā)
例如,支付成功后,支付系統(tǒng)向 MQ 發(fā)送一條消息;其他系統(tǒng)(如訂單系統(tǒng)、庫存系統(tǒng))訂閱該消息,無需輪詢數(shù)據(jù)庫
這減少了冗余查詢,提高了數(shù)據(jù)一致性和系統(tǒng)性能。
1.3.4.延遲通知:
MQ 支持延遲消息功能,適用于在特定時間后觸發(fā)操作的場景
例如,在電子商務(wù)平臺中,用戶下單后未支付,系統(tǒng)將超時取消訂單的任務(wù)放入 MQ 延遲隊列;MQ 在指定時間(如下單后 30 分鐘)自動發(fā)送消息,觸發(fā)取消流程
這簡化了定時任務(wù)管理,提升了用戶體驗
1.4.RabbitMQ
RabbitMQ 是 MQ 的一種流行實現(xiàn),它基于 AMQP(高級消息隊列協(xié)議),提供了可靠的消息傳遞、隊列管理和路由功能。并能處理高吞吐量和復(fù)雜消息的路由需求
2.RabbitMQ 工作模式
RabbitMQ支持多種工作模式來處理消息的生產(chǎn)和消費。這些模式適用于不同場景,幫助實現(xiàn)高效、可靠的消息傳遞
2.1 Simple (簡單模式)
最基本的點對點模式。生產(chǎn)者(P)將消息發(fā)送到隊列(Queue),消費者(C)從隊列中取出消息。隊列充當緩存區(qū),確保消息在傳遞過程中不會丟失

一個生產(chǎn)者對應(yīng)一個消費者,每條消息只能被消費一次,簡單易用
適用場景:消息需要被單個消費者處理的場景,例如日志記錄
2.2 Work Queue (工作隊列模式)
擴展了簡單模式,消息被分發(fā)到不同的消費者,實現(xiàn)負載均衡

每個消費者接收不同的消息,同時支持并行處理,提高系統(tǒng)吞吐量
適用場景:集群環(huán)境中的異步任務(wù)處理,例如短信通知服務(wù):訂單消息發(fā)送到隊列,多個短信服務(wù)實例競爭消息并發(fā)送通知
2.3 Publish/Subscribe (發(fā)布/訂閱模式)
引入交換機,生產(chǎn)者發(fā)送消息到交換機,交換機將消息復(fù)制并廣播到所有綁定的隊列,每個隊列對應(yīng)一個消費者。

適用場景:消息需要被多個消費者同時接收的場景,例如實時通知或廣播消息
2.4 Routing(路由模式)
發(fā)布/訂閱模式的變種,增加路由鍵。生產(chǎn)者發(fā)送消息時指定RoutingKey,交換機根據(jù)BindingKey規(guī)則將消息篩選后路由到特定隊列

適用場景:需要根據(jù)特定規(guī)則分發(fā)消息的場景,例如將錯誤日志路由到專門的處理服務(wù)
2.5 Topics(通配符模式)
路由模式的升級版,支持通配符匹配RoutingKey。RoutingKey使用點分隔符(如"order.*"),交換機根據(jù)模式規(guī)則路由消息

適用場景:需要靈活匹配和過濾消息的場景,例如訂單系統(tǒng)中的多級分類
2.6 RPC(RPC通信模式)
實現(xiàn)遠程過程調(diào)用(RPC),生產(chǎn)者發(fā)送請求消息,消費者處理并返回響應(yīng)。通過兩個隊列(請求隊列和響應(yīng)隊列)模擬回調(diào)機制

適用場景:分布式系統(tǒng)中的遠程調(diào)用,例如微服務(wù)間的方法調(diào)用
2.7 Publisher Confirms(發(fā)布確認模式)
確保消息可靠發(fā)送到RabbitMQ服務(wù)器的機制。生產(chǎn)者將通道設(shè)置為confirm模式后,每條消息獲得唯一ID,服務(wù)器異步發(fā)送確認(ACK)表示消息已接收
適用場景:對數(shù)據(jù)安全性要求高的場景,例如金融交易或訂單處理(如支付系統(tǒng))
3. RabbitMQ 的實現(xiàn)
3.1.Spring AMQP
Spring 提供了RabbitMQ 開發(fā)的封裝,Spring AMQP通過集成Spring生態(tài),大幅簡化了消息隊列的實現(xiàn)
3.1.1.引入依賴
<!--Spring MVC相關(guān)依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--RabbitMQ相關(guān)依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>3.1.2.配置 Constants
public class Constants {
public static final String WORK_QUEUE = "WORK_QUEUE";
public static final String FANOUT_QUEUE1 = "Fanout_QUEUE1";
public static final String FANOUT_QUEUE2 = "Fanout_QUEUE2";
public static final String FANOUT_EXCHANGE = "Fanout_EXCHANGE";
public static final String DIRECT_QUEUE1 = "DIRECT_QUEUE1";
public static final String DIRECT_QUEUE2 = "DIRECT_QUEUE2";
public static final String DIRECT_EXCHANGE = "DIRECT_EXCHANGE";
public static final String TOPIC_QUEUE1 = "TOPIC_QUEUE1";
public static final String TOPIC_QUEUE2 = "TOPIC_QUEUE2";
public static final String TOPIC_EXCHANGE = "TOPIC_EXCHANGE";
}3.1.3.配置 Config
@Configuration
public class RabbitMQConfig {
//work_queue
@Bean("workQueue")
public Queue workQueue(){
return QueueBuilder.durable(Constants.WORK_QUEUE).build();
}
//fanout_queue
@Bean("fanoutQueue1")
public Queue fanoutQueue1(){
return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
}
@Bean("fanoutQueue2")
public Queue fanoutQueue2(){
return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
}
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).build();
}
@Bean("bindingFanoutQueue1")
public Binding bindingFanoutQueue1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue1") Queue queue){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean("bindingFanoutQueue2")
public Binding bindingFanoutQueue2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue2") Queue queue){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
// direct_queue
@Bean("directQueue1")
public Queue directQueue1(){
return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
}
@Bean("directQueue2")
public Queue directQueue2(){
return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
}
@Bean("directExchange")
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).build();
}
@Bean("bindingDirectQueue1")
public Binding bindingDirectQueue1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue){
return BindingBuilder.bind(queue).to(directExchange()).with("a");
}
@Bean("bindingDirectQueue2")
public Binding bindingDirectQueue2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
return BindingBuilder.bind(queue).to(directExchange()).with("b");
}
@Bean("bindingDirectQueue3")
public Binding bindingDirectQueue3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
return BindingBuilder.bind(queue).to(directExchange()).with("c");
}
@Bean("topicQueue1")
public Queue topicQueue1(){
return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
}
@Bean("topicQueue2")
public Queue topicQueue2(){
return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
}
@Bean("topicExchange")
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).build();
}
@Bean("bindingTopicQueue1")
public Binding bindingTopicQueue1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.x.*");
}
@Bean("bindingTopicQueue2")
public Binding bindingTopicQueue2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.*.y");
}
@Bean("bindingTopicQueue3")
public Binding bindingTopicQueue3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("xy'.#");
}
}可以通過配置 Config 一次性大量的聲明,隊列、交換機、綁定關(guān)系等,大幅度縮減了頻繁創(chuàng)建文件的次數(shù)
3.1.4.ProducerController
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/work")
public String work(){
for (int i = 0; i <10 ; i++) {
rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE,"hello work queue" + i);
}
return "發(fā)送成功";
}
@RequestMapping("/fanout")
public String fanout(){
for (int i = 0; i <10 ; i++) {
rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello fanout queue" + i);
}
return "發(fā)送成功";
}
@RequestMapping("/direct/{routingKey}")
public String direct(@PathVariable("routingKey") String routingKey){
rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routingKey,"hello direct this is routingKey " + routingKey);
return "發(fā)送成功";
}
@RequestMapping("/topic/{routingKey}")
public String topic(@PathVariable("routingKey") String routingKey){
rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey,"hello topic " + routingKey);
return "發(fā)送成功";
}
}這是一個基于Spring Boot的RabbitMQ消息生產(chǎn)者控制器(ProducerController),用于向RabbitMQ消息隊列發(fā)送消息。它實現(xiàn)了四種常見的消息隊列模式,通過HTTP接口觸發(fā)消息發(fā)送
在實際應(yīng)用中,它可以作為微服務(wù)架構(gòu)中的生產(chǎn)者模塊,用于解耦系統(tǒng)組件、實現(xiàn)異步處理或事件驅(qū)動架構(gòu)
3.1.5.WorkListener
@Component
public class WorkListener {
@RabbitListener( queues = Constants.WORK_QUEUE )
public void workListener1(String message) {
System.out.println("隊列["+Constants.WORK_QUEUE+"] 接收到消息:" + message);
}
@RabbitListener( queues = Constants.WORK_QUEUE )
public void workListener2(String message) {
System.out.println("隊列["+Constants.WORK_QUEUE+"] 接收到消息:" + message);
}
}這是一個 Spring 組件,用于實現(xiàn)RabbitMQ的并行消費功能
@RabbitListener 不僅可用于方法,還可用于類級別。當標注在處理方法時如上圖代碼所示,當標注在類時,需要搭配 @RabbitHandler 使用,將 @RabbitHandler 標注在類的方法上
3.1.6.FanoutListener
@Component
public class FanoutListener {
@RabbitListener(queues = Constants.FANOUT_QUEUE1)
public void fanoutListener1(String message){
System.out.println("隊列["+Constants.FANOUT_QUEUE1+"] 接收到消息:" + message);
}
@RabbitListener(queues = Constants.FANOUT_QUEUE2)
public void fanoutListener2(String message){
System.out.println( "隊列["+Constants.FANOUT_QUEUE2+"] 接收到消息:" + message);
}
}FanoutListener 是一個基于 Spring AMQP 的消息消費者組件,專門用于處理 ?Fanout 類型交換機的消息。它通過@RabbitListener注解監(jiān)聽兩個不同的隊列,實現(xiàn) ?廣播模式? 的消息消費
3.1.7.DirectListener
@Component
public class DirectListener {
@RabbitListener(queues = Constants.DIRECT_QUEUE1)
public void queueListener1(String msg) throws InterruptedException {
System.out.println("隊列["+Constants.DIRECT_QUEUE1+"] 接收到消息:" + msg);
}
@RabbitListener(queues = Constants.DIRECT_QUEUE2)
public void queueListener2(String msg) throws InterruptedException {
System.out.println("隊列["+Constants.DIRECT_QUEUE2+"] 接收到消息:" + msg);
}
}DirectListener專門用于處理 ?Direct 類型交換機的消息,實現(xiàn)?路由鍵精準匹配?的消息分發(fā)模式,實現(xiàn)原理上同
3.1.8.TopicsListener
public class TopicsListener {
@RabbitListener(queues = Constants.TOPIC_QUEUE1)
public void topicListener1(String message){
System.out.println( "隊列["+Constants.TOPIC_QUEUE1+"] 接收到消息:" + message);
}
@RabbitListener(queues = Constants.TOPIC_QUEUE2)
public void topicListener2(String message){
System.out.println( "隊列["+Constants.TOPIC_QUEUE2+"] 接收到消息:" + message);
}
}TopicsListener專門用于處理 ?Topic 類型交換機的消息,實現(xiàn) ?通配符路由? 的靈活消息分發(fā)模式
總結(jié):
Spring AMQP:基于Spring框架的抽象層,提供聲明式配置(如注解驅(qū)動),簡化了消息生產(chǎn)、消費和資源管理。它封裝了原生API,減少了樣板代碼,支持與Spring Boot無縫集成。適用于快速開發(fā)、維護性要求高的企業(yè)應(yīng)用
4.RabbitMQ 高級特性
4.1 消息確認
消息確認用于確保消息被消費者正確處理,防止消息丟失或重復(fù)處理。它通過讓消費者向生產(chǎn)者或代理發(fā)送確認信號來實現(xiàn)
4.1.1.自動確認
消息代理在將消息傳遞給消費者后,將立即自動確認,無需消費者干預(yù)
這種方式簡單高效,但風險較高:如果消費者處理消息時崩潰,消息可能丟失,因為沒有重試機制。
4.1.2手動確認
消費者在處理消息后,需要顯式發(fā)送確認信號給消息代理
這種方式提供了更高的控制性,確保消息只有在成功處理后才被標記為完成。如果處理失敗,消費者可以選擇重新入隊或丟棄消息
4.1.2.1.手動確認步驟
1.消費者訂閱消息:消費者連接到隊列,并聲明需要手動確認模式。
2.處理消息:消費者接收消息并執(zhí)行業(yè)務(wù)邏輯(如數(shù)據(jù)處理或存儲)。
3.發(fā)送確認信號:如果處理成功,消費者發(fā)送一個確認(ACK)信號給代理;如果失敗,發(fā)送否定確認(NACK)信號,讓消息重新入隊或丟棄。
4.代理響應(yīng):代理收到ACK后,從隊列中刪除消息;收到NACK后,根據(jù)配置重試或移至死信隊列
4.1.2.2.代碼實現(xiàn)
在 application.yml 中配置相關(guān)屬性
rabbitmq:
listener:
simple:
acknowledge-mode: autoController
@RestController
@RequestMapping("/ack")
public class ProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/auto")
public String auto() {
for (int i = 0; i < 1; i++) {
rabbitTemplate.convertAndSend(Constants.AKC_EXCHANGE_AUTO,"abc","auto 發(fā)送消息:" + i);
}
return "auto success";
}
@RequestMapping("/none")
public String none() {
for (int i = 0; i < 1; i++) {
rabbitTemplate.convertAndSend(Constants.AKC_EXCHANGE_NONE,"abc","none 發(fā)送消息:" + i);
}
return "none success";
}
@RequestMapping("/manual")
public String manual() {
for (int i = 0; i < 1; i++) {
rabbitTemplate.convertAndSend(Constants.AKC_EXCHANGE_MANUAL,"abc","manual 發(fā)送消息:" + i);
}
return "manual success";
}NoneListener
@Component
public class NoneListener {
@RabbitListener(queues = Constants.AKC_QUEUE_NONE)
public void listen(Message message, Channel channel) throws Exception {
System.out.println("接收到消息: "+ new String(message.getBody(),"UTF-8")+
"deliveryTag: " + message.getMessageProperties().getDeliveryTag() );
System.out.println("auto 業(yè)務(wù)邏輯處理完畢");
}
}AutoListener
@Component
public class AutoListener {
@RabbitListener(queues = Constants.AKC_QUEUE_AUTO)
public void listen(Message message, Channel channel) throws Exception {
System.out.println("接收到消息: "+ new String(message.getBody(),"UTF-8")+
"deliveryTag: " + message.getMessageProperties().getDeliveryTag() );
System.out.println("auto 業(yè)務(wù)邏輯處理完畢");
}
}消費端不確認收到消息時會自動重復(fù)消息入隊
4.2.持久性
在 RabbitMQ 中,持久性是確保消息在服務(wù)意外停止或重啟后不丟失的關(guān)鍵機制。它通過交換器、隊列和消息三部分的持久化來實現(xiàn)
4.2.1.交換機持久化
在 Spring AMQP 中,交換器是消息路由的入口。如果交換器不持久化,RabbitMQ服務(wù)重啟后,其元數(shù)據(jù)(如名稱、類型)會丟失,導(dǎo)致消息無法被正確路由
實現(xiàn)時,在聲明交換器時將 durable 參數(shù)設(shè)為true
Exchange exchange = ExchangeBuilder.topicExchange("myExchange").durable(true).build();4.2.2.隊列持久化
在 Spring AMQP 中,隊列是消息存儲的容器。如果隊列不持久化,服務(wù)重啟后隊列會被刪除,所有消息也會丟失
實現(xiàn)時,在聲明隊列時將 durable 參數(shù)設(shè)為true。也就是在聲明隊列時,使用 .durable() 來使隊列持久化
Queue queue = QueueBuilder.durable("myQueue").build(); // 默認durable=true
4.2.3.消息持久化
消息本身需要顯式設(shè)置為持久化,否則即使隊列持久化,消息也可能在重啟后丟失
4.2.3.1.RabbitMQ 客戶端實現(xiàn)
在 RabbitMQ 客戶端中,可以通過設(shè)置 MessageProperties.PERSISTENT_TEXT_PLAIN ,將這個參數(shù)傳入 channel.basicPublish() 中 完成消息持久化,當然 隊列持久化是消息持久化的前提。
String messageContent = "This is a persistent message";
// 2. 設(shè)置消息屬性為持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,
messageContent.getBytes());MessageProperties.PERSISTENT_TEXT_PLAIN 是庫 com.rabbitmq.client.MessageProperties 類中的一個靜態(tài)常量,不需要用戶手動編寫代碼,用于設(shè)置消息為持久化模式。
4.2.3.2.RabbitTemplate 實現(xiàn)
如果使用 RabbitTemplate 發(fā)送持久化消息,代碼如下:
public class test {
public static void main(String[] args) {
String message = "This is a persistent message";
Message messageObject = new Message(message.getBytes(), new MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);
}
}Message :
屬于org.springframework.amqp.core.Message 類, 是Spring AMQP框架中的核心類,用于封裝一條AMQP消息
它表示發(fā)送或接收的消息實體,包含消息體(payload)和消息屬性(如頭部信息、路由鍵等)。主要作用包括:
存儲消息的原始字節(jié)數(shù)據(jù)(body)
提供訪問和修改消息元數(shù)據(jù)的接口(通過 MessageProperties)
在RabbitMQ客戶端和服務(wù)器之間傳輸消息時,確保數(shù)據(jù)結(jié)構(gòu)的標準化
message.getBytes:將字符串消息轉(zhuǎn)換為字節(jié)數(shù)組,作為消息的實際內(nèi)容(body),以便RabbitMQ處理。消息體必須是字節(jié)數(shù)組,因為AMQP協(xié)議支持二進制數(shù)據(jù)傳輸
new MessageProperties():初始化消息的屬性對象,用于設(shè)置消息的元數(shù)據(jù)(如頭部、優(yōu)先級、持久化模式等)
4.3.發(fā)送方確認
在使用RabbitMQ時,消息持久化可以防止服務(wù)器崩潰導(dǎo)致的消息丟失,但如果消息在傳輸過程中丟失(例如RabbitMQ重啟期間生產(chǎn)者投遞失?。?,消息根本未到達服務(wù)器,持久化也無法解決
RabbitMQ提供了confirm機制(發(fā)送方確認)來確保生產(chǎn)者知道消息是否成功到達Exchange。相比事務(wù)機制,confirm機制性能更高,是實際工作中的首選方案
Confirm 機制允許生產(chǎn)者設(shè)置一個回調(diào)監(jiān)聽 (ConfirmCallback)。無論消息是否到達 Exchange,RabbitMQ 都會觸發(fā)回調(diào):
1.如果消息成功到達Exchange,回調(diào)返回 ack = true。
2.如果消息未到達Exchange(例如網(wǎng)絡(luò)故障或Exchange不存在),回調(diào)返回 ack = false,并提供失敗原因(cause)。
該機制僅確認消息是否到達 Exchange,不保證消息被 Queue 處理(后續(xù)需結(jié)合return退回模式處理Queue級錯誤)
4.3.1.配置 Confirm 機制
listener:
simple:
acknowledge-mode: manual
publisher-confirm-type: correlated4.3.2.Confirm 代碼實現(xiàn)
1.先創(chuàng)建 RabbitTemplate Bean,讓新創(chuàng)建的 RabbitTemplate 實現(xiàn) ConfirmCallback 接口,使其替代默認的 RabbitTemplate
2.重寫 ConfirmCallback 接口內(nèi)的 confirm 方法,以此來實現(xiàn) Confirm 機制
3.完善 confirm 內(nèi)的邏輯,如果為 ack 即 Exchange 收到消息,完善相應(yīng)邏輯;如果為 false 可以打印失敗 cause 來完善業(yè)務(wù)邏輯
@RequestMapping("/confirm")
public String confirm() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("Confirm 生產(chǎn)端發(fā)送成功" );
if(b){
System.out.printf("Exchange 接收到消息 ,消息ID: %s \n",correlationData == null? null : correlationData.getId());
}else {
System.out.printf("Exchange 未接收到消息,消息ID:%s,cause:%s\n",correlationData == null? null : correlationData.getId()
,s);
}
}
});
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE+1,"abc","confirm.test",correlationData);
return "confirm success";
}但RabbitTemplate 是單例對象,所以存在兩個問題。
1.在 confirm 中設(shè)置 RabbitTemplate 會影響所有使用 RabbitTemplate 的方法
2.重復(fù)調(diào)用接口會提示錯誤
可以直接創(chuàng)建一個新的 RabbitTemplate 類,但是需要創(chuàng)建一個原本的 rabbitTemplate 給其他方法調(diào)用,修改完的 RabbitTemplate 代碼如下:
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("Confirm 生產(chǎn)端發(fā)送成功" );
if(b){
System.out.printf("Exchange 接收到消息 ,消息ID: %s \n" , correlationData == null? null : correlationData.getId());
}else {
System.out.printf("Exchange 未接收到消息,消息ID:%s , cause: %s\n",correlationData == null? null : correlationData.getId()
,s);
}
}
});
return rabbitTemplate;
}
}4.4.重試機制
RabbitMQ 的重試機制基于消息確認模式:
自動確認模式:RabbitMQ在消息投遞給消費者后自動確認。如果消費者處理失?。ㄈ鐠伋霎惓#?,RabbitMQ會根據(jù)配置參數(shù)自動重試消息。例如,設(shè)置重試次數(shù) n,RabbitMQ會負責重發(fā)消息 n 次。
手動確認模式:消費者需顯式調(diào)用確認方法。如果處理失敗,應(yīng)用程序可選擇是否重試(通過設(shè)置 requeue 參數(shù))。這給予應(yīng)用更多控制權(quán),但重試邏輯需由開發(fā)者實現(xiàn)。
重試機制配置
rabbitmq:
listener:
simple:
acknowledge-mode: auto/manual
retry:
enabled: true # 開啟重試機制
initial-interval: 5000ms # 初始重試間隔,例如5秒
max-attempts: 5 # 最大重試次數(shù)(包括首次消自動確認重試模式
rabbitmq:
listener:
simple:
acknowledge-mode: auto
retry:
enabled: true # 開啟重試機制
initial-interval: 5000ms # 初始重試間隔,例如5秒
max-attempts: 5 # 最大重試次數(shù)(包括首次消費)沒開啟重試機制之前,RabbitMQ 能自動的將處理失敗的消息一直重新入隊也不會拋出異常。
重試機制開啟之后,會根據(jù)設(shè)置的次數(shù)重新入隊,當設(shè)置的次數(shù)耗盡也沒有解決問題時,就會拋出異常。
4.5.TTL
TTL, 過期時間,單位為毫秒(ms)是RabbitMQ中用于控制消息或隊列生命周期的機制。
當消息或隊列超過設(shè)定的存活時間后,未被消費的消息會被自動清除。
這適用于電商訂單超時取消(如24小時未付款自動取消訂單)或退款超時處理(如7天未處理自動退款)等場景
TTL為 0 表示消息必須立即投遞給消費者,否則被丟棄;未設(shè)置TTL表示消息永不過期
4.5.1.設(shè)置消息的 TTL
1.創(chuàng)建 MessagePostProcessor 類對象,重寫 postProcessMessage 方法
2.在重寫的方法里通過 message.getMessageProperties().setExpiration("10000") 設(shè)置過期時間,單位毫秒
3.把該 MessagePostProcessor 類對象作為參數(shù)傳輸?shù)?rabbitTemplate.convertAndSend。
@RequestMapping("/ttlMessage")
public String ttlMessage(){
System.out.println("ttl......");
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
};
System.out.println("ttl2 .....");
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"abc","ttl",messagePostProcessor);
return "ttlMessage success";
}4.5.2.設(shè)置隊列的 TTL
只需要在聲明隊列時加上 .ttl() 就行
@Bean("ttlQueue")
public Queue ttlQueue(){
return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(10000).build();
}4.5.3.最短TTL共享
消息隊列會掃描當前隊列中所有消息的TTL值,并選擇最小的TTL作為隊列的全局TTL。這意味著所有消息的過期時間會受到最短TTL的約束
4.6.死信
死信, 指因某些原因無法被消費的消息
當消息在一個隊列中變成死信后,會被重新發(fā)送到另一個交換器,綁定該交換器的隊列稱為死信隊列
消息變成死信的常見情況包括
- 消息被拒絕(Basic.Reject/Basic.Nack)且 requeue 參數(shù)為 false。
- 消息過期。
- 隊列達到最大長度。通過 .maxLength
當聲明隊列時,可以在隊列加上 .deadLetterExchange() 參數(shù)綁定死信交換機,接著加上 .deadLetterRoutingKey() 參數(shù)指定路由給 死信交換機 所綁定的死信隊列
4.7.延遲隊列
延遲隊列(Delayed Queue)是一種消息傳遞機制,消息在發(fā)送后不會立即被消費者獲取,而是等待特定時間后才可被消費
這在許多場景中非常有用,例如智能家居:用戶指令在指定時間后執(zhí)行,或是日常管理:會議前15分鐘自動提醒。
RabbitMQ本身不直接支持延遲隊列,但可以通過TTL和死信隊列組合模擬實現(xiàn)。然而,這種方式存在局限性,尤其在處理不同延遲時間的消息時

TTL機制允許設(shè)置消息的存活時間(單位:毫秒)。當消息過期時,它會被路由到死信隊列,消費者從死信隊列消費消息,實現(xiàn)延遲效果。關(guān)鍵步驟包括:
聲明一個正常隊列,并綁定到死信交換器。
生產(chǎn)者發(fā)送消息時,設(shè)置消息的TTL(例如10秒或20秒)。
消費者監(jiān)聽死信隊列,消費過期消息。
4.7.1.延遲隊列問題
當隊列中包含不同TTL的消息時,RabbitMQ只檢查隊首消息的過期時間。如果隊首消息的TTL較長,后續(xù)短TTL消息會被阻塞,直到隊首消息過期。
原因在于RabbitMQ的隊列設(shè)計:
隊列檢查原則:RabbitMQ只監(jiān)控隊列頭部(第一個消息)的過期時間。只有當頭部消息過期或被消費后,才會檢查下一個消息。
TTL計算起點:消息的TTL是從它被發(fā)布到隊列時開始計算的(絕對時間),但RabbitMQ只在處理頭部時“發(fā)現(xiàn)”過期。
潛在問題:如果頭部消息的TTL較長,它會阻塞后續(xù)消息的過期檢查,即使后續(xù)消息的TTL更短。這可能導(dǎo)致短TTL消息延遲過期
解決方案一:為不同延遲時間創(chuàng)建獨立隊列
要解決此問題,核心思路是避免混合不同TTL的消息在同一個隊列中。RabbitMQ官方推薦為每個延遲時間創(chuàng)建獨立的隊列。這樣,每個隊列只處理單一TTL,確保過期檢查的準確性。
1.設(shè)計多個隊列:為每個需支持的延遲時間(如10秒、20秒)創(chuàng)建獨立的正常隊列。
每個隊列綁定到同一個死信交換器。
設(shè)置隊列的.deadLetterExchange() 參數(shù) 和 .deadLetterRoutingKey()
2.生產(chǎn)者發(fā)送消息:根據(jù)消息的延遲需求,發(fā)送到對應(yīng)的隊列。
3.消費者統(tǒng)一監(jiān)聽死信隊列:死信隊列接收所有過期消息,消費者無需修改。
解決方案二:添加插件
通過下載插件, 添加完插件之后再創(chuàng)建延遲隊列,只需要聲明延遲交換機時添加 .delayed() 方法,設(shè)置消息時使用 message.setDelayLong() 指定延遲時間,無需額外設(shè)置過期時間或創(chuàng)建死信隊列來完成延遲隊列的功能。
4.8.消息分發(fā)
當隊列有多個消費者時,RabbitMQ默認采用輪詢策略分發(fā)消息。每條消息只發(fā)給一個消費者,但分發(fā)順序固定,不考慮消費者處理速度。例如:
消費者A處理速度快(每秒10條消息)。
消費者B處理速度慢(每秒2條消息)。
結(jié)果:消費者B可能積壓消息,消費者A空閑,整體吞吐量下降。
因此, RabbitMQ提供 channel.basicQos(prefetchCount) 方法來解決負載不均衡問題。其工作原理類似于TCP/IP的“滑動窗口”機制
prefetchCount:設(shè)置消費者允許的最大未確認消息數(shù)量(整數(shù))。例如,(prefetchCount) = 5 表示每個消費者最多持有5條未處理消息。
計數(shù)機制:RabbitMQ發(fā)送消息時計數(shù)+1,消費者確認消息后計數(shù)-1。當計數(shù)達 prefetchCount 上限時,RabbitMQ暫停向該消費者發(fā)送新消息。
特殊值 prefetchCount = 0 表示無上限(恢復(fù)默認輪詢)。
可以通過設(shè)置 application.yml 中的參數(shù)來設(shè)置 prefetchCount 的大小
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手動確認模式
prefetch: 5 # 設(shè)置 prefetchCount = 54.9.冪等性保障
冪等性是計算機科學中的關(guān)鍵概念,尤其在分布式系統(tǒng)和消息隊列中至關(guān)重要。它確保操作被多次執(zhí)行時,對系統(tǒng)狀態(tài)的影響保持一致,從而避免重復(fù)處理導(dǎo)致的數(shù)據(jù)不一致
RabbitMQ的傳輸保障模式:
最多一次:消息發(fā)送后,如果處理失?。ㄈ缦M者崩潰、Broker 傳輸給消費者失敗),消息可能丟失,但不會重復(fù)。這適合對消息丟失不敏感的場景,比如日志記錄。
最少一次:消息保證至少被處理一次,不會丟失。但如果處理失敗,RabbitMQ會重試發(fā)送消息,這可能導(dǎo)致消息被多次處理(即重復(fù))。這是RabbitMQ的默認模式。比如,通過發(fā)布者確認、消費者確認、Broker 持久化等來保證 “最少一次” 的功能實現(xiàn)。
恰好一次:RabbitMQ無法原生支持,因為分布式系統(tǒng)中存在網(wǎng)絡(luò)故障、節(jié)點失敗等問題,很難保證消息只傳輸一次。實現(xiàn)“恰好一次”需要復(fù)雜的機制,通常由應(yīng)用層自己處理
在 MQ 中,冪等性指同一條消息被多次消費時,對系統(tǒng)的影響一致。RabbitMQ 支持“最多一次”和“最少一次”的傳輸保障,無法實現(xiàn)“恰好一次”。RabbitMQ的 “最少一次” 模式通過重試保證消息不丟失
4.9.1.解決方案
解決冪等性的核心是為消息添加唯一標識,并確保消費前檢查狀態(tài)。以下是常用方法:
全局局唯一 ID:
1.為每條消息分配唯一 ID(如 UUID 或 RabbitMQ 自帶 ID),消費者處理前檢查該 ID 是否已消費。
2.使用 Redis 的原子操作 SETNX:將消息 ID 作為 key,執(zhí)行 SETNX messageID 1。如果返回 1(key 不存在),則正常消費;返回 0(key 存在),則丟棄消息
業(yè)務(wù)邏輯判斷:
在業(yè)務(wù)層檢查數(shù)據(jù)狀態(tài),例如:
- 查詢數(shù)據(jù)庫記錄是否存在(如支付訂單時驗證是否已處理)。
- 使用樂觀鎖機制:更新數(shù)據(jù)前檢查版本號,確保操作只執(zhí)行一次。
- 狀態(tài)機控制:業(yè)務(wù)對象定義狀態(tài)(如“未處理”到“已完成”),僅當狀態(tài)匹配時才處理。
到此這篇關(guān)于RabbitMQ核心機制的文章就介紹到這了,更多相關(guān)RabbitMQ機制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java NIO:淺析IO模型_動力節(jié)點Java學院整理
在進入Java NIO編程之前,我們今天先來討論一些比較基礎(chǔ)的知識:I/O模型。對java io nio相關(guān)知識感興趣的朋友一起學習吧2017-05-05
JSON數(shù)據(jù)轉(zhuǎn)換成Java對象的方法
就目前來講,將Java對象轉(zhuǎn)換成JSON對象還是相當簡單的,但是 將JSON對象轉(zhuǎn)換成Java對象,就相對比較復(fù)雜了些2014-03-03

