springboot-rabbitmq-reply?消息直接回復(fù)模式詳情
一、使用場景
MQ的作用包括了解耦、異步等。
通常生產(chǎn)者只負(fù)責(zé)生產(chǎn)消息,而不關(guān)心消息誰去獲取,或者消費結(jié)果如何;消費者只負(fù)責(zé)接收指定的消息進行業(yè)務(wù)處理而不關(guān)心消息從哪里來一級回復(fù)業(yè)務(wù)處理情況。但我們項目中有特殊的業(yè)務(wù)存在,我們作為消息生產(chǎn)者在生產(chǎn)消息后需要接收消費者的響應(yīng)結(jié)果(說白了就是類似同步調(diào)用 請求響應(yīng)的MQ使用),經(jīng)過研究,MQ的Reply模式(直接回復(fù)模式)就是為此種業(yè)務(wù)模式而產(chǎn)生。
二、Reply實戰(zhàn)
(1)依賴與YML配置
依賴:
我這里只列出最核心的rabbitMq所需依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>配置:
無其余特殊配置,因為reply就是rabbitmq的一種交互方式而已
spring:
rabbitmq:
host: 10.50.40.116
port: 5673
username: admin
password: admin(2)RabbitMq bean配置
package com.leilei.demo;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author lei
* @create 2022-09-19 21:44
* @desc mq配置
**/
@Configuration
public class RabbitMqConfig {
@Bean
public Queue bizQueue() {
return new Queue("bizQueue");
}
@Bean
public Queue replyQueue() {
return new Queue("replyQueue");
}
@Bean
FanoutExchange bizExchange() {
return new FanoutExchange("bizExchange");
}
}業(yè)務(wù)類:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Vehicle implements Serializable {
private Integer id;
private String name;
}(3)消息生產(chǎn)端
消息生產(chǎn)端需要做的事情:有生產(chǎn)消息、接受消息消費響應(yīng)
(1)生產(chǎn)消息
- 1、生產(chǎn)消息,看業(yè)務(wù)場景選擇是否生成全局唯一自定義的消息ID
- 2、指定消息消費后響應(yīng)的隊列(Reply)
/**
* 生產(chǎn)消息
*
* @param
* @return void
* @author lei
* @date 2022-09-19 21:59:18
*/
public void replySend() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setReplyTo("replyQueue");
//todo 根據(jù)業(yè)務(wù),做一個嚴(yán)謹(jǐn)?shù)娜治ㄒ籌D,我這里暫時用UUID
String correlationId = UUID.randomUUID().toString();
// 我這里指定了唯一消息ID,看業(yè)務(wù)場景,消費者消費響應(yīng)后,生產(chǎn)者端可根據(jù)消息ID做業(yè)務(wù)處理
messageProperties.setCorrelationId(correlationId);
Vehicle vehicle = new Vehicle(1, "川A0001");
Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties);
rabbitTemplate.convertAndSend("bizExchange","",message);
System.out.println("生產(chǎn)者發(fā)送消息,自定義消息ID為:" + correlationId);
}(2)接受Reply響應(yīng)
消費者消費消息后會將處理結(jié)果進行發(fā)送到一個隊列,我們讀取這里隊列就可以拿到對應(yīng)消息的響應(yīng)結(jié)果進行業(yè)務(wù)處理了
/**
* 接收消息響應(yīng)
*
* @param message
* @return void
* @author lei
* @date 2022-09-19 21:59:27
*/
@RabbitListener(queues = "replyQueue")
public void replyResponse(Message message) {
String s = new String(message.getBody());
String correlationId = message.getMessageProperties().getCorrelationId();
System.out.println("收到客戶端響應(yīng)消息ID:" + correlationId);
//todo 根據(jù)消息ID可判斷這是哪一個消息的響應(yīng),我們就可做業(yè)務(wù)操作
System.out.println("收到客戶端響應(yīng)消息:" + s);
}(4)消息消費端
消息消費端需要做的事有:接受消息然后進行業(yè)務(wù)處理、響應(yīng)消息
(1)方法一:sendTo注解+方法返回值
一般來說,我們mq消費者監(jiān)聽方法不需要返回值,我們這里使用sendTo注解,則需要將要響應(yīng)的消息定義為返回值,sendTo注解中指定要響應(yīng)到哪個隊列
重點:
- 1、sendTo注解指定要相應(yīng)的隊列(注意和生產(chǎn)端保持一致)
- 2、方法定義的返回值內(nèi)容就是要響應(yīng)的消息,最終會發(fā)送到sendTo注解指定要相應(yīng)的隊列
- 3、這種方法的缺點是消費端的主關(guān)性很高,因為sendTo指定的目標(biāo)隊列可以自己瞎寫,導(dǎo)致生產(chǎn)者端無法正確收到消息響應(yīng),但我相信一般項目中也不會這么干
/**
* 方式1 SendTo指定響應(yīng)隊列
*
* @param message
* @return String
* @author lei
* @date 2022-09-19 16:17:52
*/
@RabbitListener(queues ="bizQueue")
@SendTo("replyQueue")
public String handleEmailMessage(Message message) {
try {
String msg=new String(message.getBody(), StandardCharsets.UTF_8);
log.info("---consumer接收到消息----{}",msg);
return "客戶端響應(yīng)消息:"+msg+"處理完成!";
} catch (Exception e) {
log.error("處理業(yè)務(wù)消息失敗",e);
}
return null;
}(2)方法二:讀取生產(chǎn)端的消息使用模板發(fā)送
與普通的消費者方法一樣,只需要RabbitListener注解監(jiān)聽業(yè)務(wù)隊列;但還需要根據(jù)消息獲取出ReplyTo地址,然后自己消費者方法內(nèi)部手動發(fā)送消息
- 1、優(yōu)點,更強烈的感受到消息請求 響應(yīng)的交互性,流程看起來更清晰
- 2、缺點,代碼不雅
/**
* 方式2 message消息獲取內(nèi)部reply rabbitmq手動發(fā)送
*
* @param message
* @return String
* @author lei
* @date 2022-09-19 16:17:52
*/
@RabbitListener(queues = "bizQueue")
public void handleEmailMessage2(Message message) {
try {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("---consumer接收到消息----{}", msg);
String replyTo = message.getMessageProperties().getReplyTo();
System.out.println("接收到的reply:" + replyTo);
rabbitTemplate.convertAndSend(replyTo, "客戶端響應(yīng)消息:" + msg + "處理完成!", x -> {
x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());
return x;
});
} catch (Exception e) {
log.error("處理業(yè)務(wù)消息失敗",e);
}
}(3)方法三:方法返回值
這種方式與1其實是一致的,但我經(jīng)過測試,因為生產(chǎn)者消息指定了ReplyTo的地址,消費者端無需自己再次手動指定,即生產(chǎn)消息到哪里,是否響應(yīng)以及響應(yīng)消息發(fā)送到哪里全由生產(chǎn)端自己空,消費者只需要處理自身業(yè)務(wù)以及返回結(jié)果
/**
* 方式三 方法有返回值,返回要響應(yīng)的數(shù)據(jù) (reply 由生產(chǎn)者發(fā)送消息時指定,消費者不做任何處理)
*
* @param message
* @return String
* @author lei
* @date 2022-09-19 23:17:47
*/
@RabbitListener(queues ="bizQueue")
public String handleEmailMessage3(Message message) {
try {
String msg=new String(message.getBody(), StandardCharsets.UTF_8);
log.info("---consumer接收到消息----{}",msg);
return "客戶端響應(yīng)消息:"+msg+"處理完成!";
}
catch (Exception e) {
log.error("處理業(yè)務(wù)消息失敗",e);
}
return null;
}(4)測試
生產(chǎn)消息:

消費消息與響應(yīng):

收到的響應(yīng):

鏈路:

如此,MQ版本的請求響應(yīng)模式就完成了,其實很多大佬使用MQ來實現(xiàn)RPC就是用的ReplyTo啦!
到此這篇關(guān)于springboot-rabbitmq-reply 消息直接回復(fù)模式詳情的文章就介紹到這了,更多相關(guān)springboot-rabbitmq-reply 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Backoff策略提高HttpClient連接管理的效率
這篇文章主要為大家介紹了Backoff策略提高HttpClient連接管理的效率使用解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-10-10
Java Spring中Quartz調(diào)度器詳解及實例
這篇文章主要介紹了Java Spring中Quartz調(diào)度器詳解及實例的相關(guān)資料,需要的朋友可以參考下2017-02-02
Springboot 使用 JSR 303 對 Controller 控制層校驗及 Service 服務(wù)層 AOP 校驗
這篇文章主要介紹了Springboot 使用 JSR 303 對 Controller 控制層校驗及 Service 服務(wù)層 AOP 校驗 使用消息資源文件對消息國際化的相關(guān)知識,需要的朋友可以參考下2017-12-12
Java數(shù)據(jù)結(jié)構(gòu)之圖的基礎(chǔ)概念和數(shù)據(jù)模型詳解
在現(xiàn)實生活中,有許多應(yīng)用場景會包含很多點以及點點之間的連接,而這些應(yīng)用場景我們都可以用即將要學(xué)習(xí)的圖這種數(shù)據(jù)結(jié)構(gòu)去解決。本文主要介紹了圖的基礎(chǔ)概念和數(shù)據(jù)模型,感興趣的可以了解一下2022-11-11

