SpringBoot+RabbitMQ實(shí)現(xiàn)消息可靠傳輸詳解
環(huán)境配置
SpringBoot 整合 RabbitMQ 實(shí)現(xiàn)消息的發(fā)送。
1.添加 maven 依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.添加 application.yml 配置文件
spring:
rabbitmq:
host: 192.168.3.19
port: 5672
username: admin
password: xxxx
3.配置交換機(jī)、隊(duì)列以及綁定
@Bean
public DirectExchange myExchange() {
DirectExchange directExchange = new DirectExchange("myExchange");
return directExchange;
}
@Bean
public Queue myQueue() {
Queue queue = new Queue("myQueue");
return queue;
}
@Bean
public Binding binding() {
return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
}
4.生產(chǎn)發(fā)送消息
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send(String message) {
rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
System.out.println("【發(fā)送消息】" + message)
return "【send message】" + message;
}
5.消費(fèi)者接收消息
@RabbitListener(queuesToDeclare = @Queue("myQueue"))
public void process(String msg, Channel channel, Message message) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date();
String time = sdf.format(date);
System.out.println("【接收信息】" + msg + " 當(dāng)前時(shí)間" + time);
6.調(diào)用生產(chǎn)端發(fā)送消息 hello,控制臺(tái)輸出:
【發(fā)送消息】hello
【接收信息】hello 當(dāng)前時(shí)間2022-05-12 10:21:14
說明消息已經(jīng)被成功接收。
消息丟失分析

一條消息的從生產(chǎn)到消費(fèi),消息丟失可能發(fā)生在以下幾個(gè)階段:
- 生產(chǎn)端丟失: 生產(chǎn)者無法傳輸?shù)?nbsp;
RabbitMQ - 存儲(chǔ)端丟失:
RabbitMQ存儲(chǔ)自身掛了 - 消費(fèi)端丟失:存儲(chǔ)由于網(wǎng)絡(luò)問題,無法發(fā)送到消費(fèi)端,或者消費(fèi)掛了,無法發(fā)送正常消費(fèi)
RabbitMQ 從生產(chǎn)端、儲(chǔ)存端、消費(fèi)端都對可靠性傳輸做很好的支持。
生產(chǎn)階段
生產(chǎn)階段通過請求確認(rèn)機(jī)制,來確保消息的可靠傳輸。當(dāng)發(fā)送消息到 RabbitMQ 服務(wù)器 之后,RabbitMQ 收到消息之后,給發(fā)送返回一個(gè)請求確認(rèn),表示RabbitMQ 服務(wù)器已成功的接收到了消息。
配置application.yml
spring:
rabbitmq:
# 消息確認(rèn)機(jī)制 生產(chǎn)者 -> 交換機(jī)
publisher-confirms: true
# 消息返回機(jī)制 交換機(jī) -> 隊(duì)列
publisher-returns: true
配置
@Configuration
@Slf4j
public class RabbitConfig {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("【correlationData】:" + correlationData);
log.info("【ack】" + ack);
log.info("【cause】" + cause);
if (ack) {
log.info("【發(fā)送成功】");
} else {
log.info("【發(fā)送失敗】correlationData:" + correlationData + " cause:" + cause);
}
}
});
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.warn("【消息發(fā)送失敗】");
log.info("【message】" + message);
log.info("【replyCode】" + replyCode);
}
});
return rabbitTemplate;
}
}
消息從 生產(chǎn)者 到 交換機(jī), 有confirmCallback 確認(rèn)模式。發(fā)送消息成功后消息會(huì)調(diào)用方法confirm(CorrelationData correlationData, boolean ack, String cause),根據(jù) ack 判斷消息是否發(fā)送成功。
消息從 交換機(jī) 到 隊(duì)列,有returnCallback 退回模式。
發(fā)送消息 product message 控制臺(tái)輸出如下:
【發(fā)送消息】product message
【接收信息】product message 當(dāng)前時(shí)間2022-05-12 11:27:56
【correlationData】:null
【ack】true
【cause】null
【發(fā)送成功】
生產(chǎn)端模擬消息丟失
這里有兩個(gè)方案:
- 發(fā)送消息后立馬關(guān)閉 broke,后者把網(wǎng)絡(luò)關(guān)閉,但是broker關(guān)閉之后控制臺(tái)一直就會(huì)報(bào)錯(cuò),發(fā)送消息也報(bào)500錯(cuò)誤。
- 發(fā)送不存在的交換機(jī):
// myExchange 修改成 myExchangexxxxx
rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);
結(jié)果:
【correlationData】:null
【ack】false
【cause】channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
【發(fā)送失敗】
當(dāng)發(fā)送失敗可以對消息進(jìn)行重試
交換機(jī)正確,發(fā)送不存在的隊(duì)列:
交換機(jī)接收到消息,返回成功通知,控制臺(tái)輸出:
【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
【ack】true
【cause】null
【發(fā)送成功】
交換機(jī)沒有找到隊(duì)列,返回失敗信息:
【消息發(fā)送失敗】
【message】product message
【replyCode】312
RabbitMQ
開啟隊(duì)列持久化,創(chuàng)建的隊(duì)列和交換機(jī)默認(rèn)配置是持久化的。首先把隊(duì)列和交換機(jī)設(shè)置正確,修改消費(fèi)監(jiān)聽的隊(duì)列,使得消息存放在隊(duì)列里。
修改隊(duì)列的持久化,修改成非持久化:
@Bean
public Queue myQueue() {
Queue queue = new Queue("myQueue",false);
return queue;
}
發(fā)送消息之后,消息存放在隊(duì)列中,然后重啟 RabbitMQ,消息不存在了。
設(shè)置隊(duì)列持久化:
@Bean
public Queue myQueue() {
Queue queue = new Queue("myQueue",true);
return queue;
}
重啟之后,隊(duì)列的消息還存在。
消費(fèi)端
消費(fèi)端默認(rèn)開始 ack 自動(dòng)確認(rèn)模式,當(dāng)隊(duì)列消息被消費(fèi)者接收,不管有沒有被消費(fèi)端消息,都自動(dòng)刪除隊(duì)列中的消息。所以為了確保消費(fèi)端能成功消費(fèi)消息,將自動(dòng)模式改成手動(dòng)確認(rèn)模式:
修改application.yml 文件
spring:
rabbitmq:
# 手動(dòng)消息確認(rèn)
listener:
simple:
acknowledge-mode: manual
消費(fèi)接收消息之后需要手動(dòng)確認(rèn):
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
@RabbitListener(queuesToDeclare = @Queue("myQueue"))
public void process(String msg, Channel channel, Message message) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date();
String time = sdf.format(date);
System.out.println("【接收信息】" + msg + " 當(dāng)前時(shí)間" + time);
System.out.println(message.getMessageProperties().getDeliveryTag());
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
}
}
如果不添加:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
發(fā)送兩條消息
消息被接收后,沒有確認(rèn),重新放到隊(duì)列中:

重啟項(xiàng)目,之后,隊(duì)列的消息會(huì)發(fā)送到消費(fèi)者,但是沒有 ack 確認(rèn),還是繼續(xù)會(huì)放回隊(duì)列中。
加上 channel.basicAck 之后,再重啟項(xiàng)目

隊(duì)列消息就被刪除了
basicAck 方法最后一個(gè)參數(shù) multiple 表示是刪除之前的隊(duì)列。
multiple 設(shè)置成 true,把后面的隊(duì)列都清理掉了

到此這篇關(guān)于SpringBoot+RabbitMQ實(shí)現(xiàn)消息可靠傳輸詳解的文章就介紹到這了,更多相關(guān)SpringBoot RabbitMQ消息可靠傳輸內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot整合Rabbitmq之Confirm和Return機(jī)制
- Spring?Boot+RabbitMQ?通過fanout模式實(shí)現(xiàn)消息接收功能(支持消費(fèi)者多實(shí)例部署)
- 詳解SpringBoot整合RabbitMQ如何實(shí)現(xiàn)消息確認(rèn)
- SpringBoot整合RabbitMQ實(shí)現(xiàn)交換機(jī)與隊(duì)列的綁定
- SpringBoot+RabbitMQ?實(shí)現(xiàn)死信隊(duì)列的示例
- springboot+rabbitmq實(shí)現(xiàn)智能家居實(shí)例詳解
- Spring boot Rabbitmq消息防丟失實(shí)踐
相關(guān)文章
Java編寫程序之輸入一個(gè)數(shù)字實(shí)現(xiàn)該數(shù)字階乘的計(jì)算
這篇文章主要介紹了Java編寫程序之輸入一個(gè)數(shù)字實(shí)現(xiàn)該數(shù)字階乘的計(jì)算,本文通過實(shí)例代碼給大家介紹的非常想詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02
Seata?AT獲取數(shù)據(jù)表元數(shù)據(jù)源碼詳解
這篇文章主要為大家介紹了Seata?AT獲取數(shù)據(jù)表元數(shù)據(jù)源碼詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-11-11
使用Spring Boot實(shí)現(xiàn)操作數(shù)據(jù)庫的接口的過程
本文給大家分享使用Spring Boot實(shí)現(xiàn)操作數(shù)據(jù)庫的接口的過程,包括springboot原理解析及實(shí)例代碼詳解,感興趣的朋友跟隨小編一起看看吧2021-07-07
關(guān)于spring data jpa一級(jí)緩存的問題
這篇文章主要介紹了關(guān)于spring data jpa一級(jí)緩存的問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11
SpringBoot使用Netty實(shí)現(xiàn)遠(yuǎn)程調(diào)用的示例
這篇文章主要介紹了SpringBoot使用Netty實(shí)現(xiàn)遠(yuǎn)程調(diào)用的示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10
SpringBoot如何整合mybatis-generator-maven-plugin 1.4.0
這篇文章主要介紹了SpringBoot整合mybatis-generator-maven-plugin 1.4.0的實(shí)現(xiàn)方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2023-01-01
Java Eclipse中實(shí)現(xiàn)快速替換變量
這篇文章主要介紹了Java Eclipse中實(shí)現(xiàn)快速替換變量,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09

