RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合
1. 概述
老話說(shuō)的好:做人要懂得變通,善于思考,有時(shí)稍微轉(zhuǎn)個(gè)彎,也許問(wèn)題就解決了。
言歸正傳,之前我們聊了 RabbitMQ 3.9.7 鏡像模式集群的搭建,今天我們來(lái)聊聊 RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合。
2. 場(chǎng)景說(shuō)明
服務(wù)器A IP:192.168.1.22
服務(wù)器B IP:192.168.1.8
服務(wù)器C IP:192.168.1.144
此三臺(tái)服務(wù)器上已搭建好了 RabbitMQ鏡像模式集群,鏡像模式集群的搭建,可參見(jiàn)我的上一篇文章。
3. 與Springboot的整合
3.1 引入依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2 生產(chǎn)服務(wù)配置
spring:
rabbitmq:
addresses: 192.168.1.22:5672,192.168.1.8:5672,192.168.1.144:5672
username: guest
password: guest
virtual-host: /
connection-timeout: 16000
# 啟用消息確認(rèn)模式
publisher-confirm-type: correlated
# 啟用 return 消息模式
publisher-returns: true
template:
mandatory: true
3.3 生產(chǎn)服務(wù)代碼
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 確認(rèn)回調(diào)
*/
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// correlationData 唯一標(biāo)識(shí)
// ack mq是否收到消息
// cause 失敗原因
System.out.println("correlationData:" + correlationData.getId());
System.out.println("ack:" + ack);
System.out.println("cause:" + cause);
}
};
/**
* 發(fā)送消息
* @param messageBody 消息體
* @param headers 附加屬性
* @throws Exception
*/
public void sendMessage(String messageBody, Map<String, Object> headers, String id) throws Exception {
MessageHeaders messageHeaders = new MessageHeaders(headers);
Message<String> message = MessageBuilder.createMessage(messageBody, messageHeaders);
rabbitTemplate.setConfirmCallback(confirmCallback);
String exchangeName = "exchange-hello";
String routingKey = "test.123";
CorrelationData correlationData = new CorrelationData(id);
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new MessagePostProcessor() {
/**
* 發(fā)送消息后做的事情
* @param message
* @return
* @throws AmqpException
*/
@Override
public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
return message;
}
}, correlationData);
}
}
3.4 消費(fèi)服務(wù)配置
spring:
rabbitmq:
addresses: 192.168.1.22:5672,192.168.1.8:5672,192.168.1.144:5672
username: guest
password: guest
virtual-host: /
connection-timeout: 16000
listener:
simple:
# 設(shè)置為手工ACK
acknowledge-mode: manual
concurrency: 5
prefetch: 1
max-concurrency: 10
3.5 消費(fèi)服務(wù)代碼
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-hello", durable = "true"),
exchange = @Exchange(value = "exchange-hello" , durable = "true", type = "topic"),
key = "test.*"
))
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("收到消息:" + message.getPayload());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}
3.6 Rest 測(cè)試代碼
@RestController
@RequestMapping("/mq")
public class RabbitmqController {
@Autowired
private Producer producer;
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam String messageBody, @RequestParam String id) throws Exception {
Map<String, Object> headers = new HashMap<>();
producer.sendMessage(messageBody, headers, id);
return "success";
}
}
4. 綜述
到此這篇關(guān)于RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合的文章就介紹到這了,更多相關(guān)RabbitMQ鏡像模式集群內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringAMQP消息隊(duì)列(SpringBoot集成RabbitMQ方式)
- 一文掌握Springboot集成RabbitMQ的方法
- springboot2.5.6集成RabbitMq實(shí)現(xiàn)Topic主題模式(推薦)
- Springboot集成RabbitMQ死信隊(duì)列的實(shí)現(xiàn)
- SpringBoot集成RabbitMQ的方法(死信隊(duì)列)
- springboot2.0集成rabbitmq的示例代碼
- Spring Boot系列教程之7步集成RabbitMQ的方法
- springboot集成rabbitMQ之對(duì)象傳輸?shù)姆椒?/a>
- spring boot集成rabbitmq的實(shí)例教程
- 詳解spring boot集成RabbitMQ
- Spring Boot 3 集成 RabbitMQ 實(shí)踐指南(原理解析)
相關(guān)文章
Java調(diào)用GPU算力的實(shí)現(xiàn)示例
本文主要介紹了Java調(diào)用GPU算力的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2025-03-03
SpringBoot+MyBatis+AOP實(shí)現(xiàn)讀寫(xiě)分離的示例代碼
高并發(fā)這個(gè)階段,肯定是需要做MySQL讀寫(xiě)分離的。本文主要介紹了SpringBoot+MyBatis+AOP實(shí)現(xiàn)讀寫(xiě)分離的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11
springboot 整合druid數(shù)據(jù)庫(kù)密碼加密功能的實(shí)現(xiàn)代碼
這篇文章主要介紹了springboot 整合druid數(shù)據(jù)庫(kù)密碼加密功能的實(shí)現(xiàn)代碼,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01
總結(jié)Java常用的時(shí)間相關(guān)轉(zhuǎn)化
今天給大家?guī)?lái)的是關(guān)于Java的相關(guān)知識(shí),文章圍繞著Java常用的時(shí)間相關(guān)轉(zhuǎn)化展開(kāi),文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06
Tomcat集群和Session復(fù)制應(yīng)用介紹
本文將詳細(xì)介紹Tomcat集群和Session復(fù)制應(yīng)用,需要了解的朋友可以參考下2012-11-11
mybatis中關(guān)于mapper的使用以及注意事項(xiàng)
這篇文章主要介紹了mybatis中關(guān)于mapper的使用以及注意事項(xiàng),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-06-06
TransmittableThreadLocal解決線程間上下文傳遞煩惱
這篇文章主要為大家介紹了TransmittableThreadLocal解決線程間上下文傳遞煩惱詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-11-11

