springboot 3.x 整合 RocketMQ 5.x的詳細過程
RocketMQ 5.x在SpringBoot中的上手使用過程
注意:rocketmq-v5-client-spring-boot-starter對springboot版本有要求,至少2.0.6.RELEASE版本的springboot無法整合。
準備環(huán)境
- JDK 17
- Spring Boot 3.2.3
- RocketMQ(服務端) 5.3.1
- rocketmq-v5-client-spring-boot-starter(客戶端) 2.3.1
在 SpringBoot 項目中依賴如下配置:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
<version>2.3.1</version>
</dependency>如果還未搭建服務端,可以先看第5節(jié)-服務器環(huán)境搭建。
參數(shù)配置
按照 SpringBoot 的約定習俗,在上手一個新的 spring-boot-starter項目時,想要知道怎么使用它,看它的 AutoConfiguration 就對了。
在 rocketmq-v5-client-spring-boot中,對應的 AutoConfiguration 類為 RocketMQAutoConfiguration,其類定義部分代碼如下:
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtTemplateResetConfiguration.class,
ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration implements ApplicationContextAware {
// ... 省略
@Bean(PRODUCER_BUILDER_BEAN_NAME)
@ConditionalOnMissingBean(ProducerBuilderImpl.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"producer.endpoints"})
public ProducerBuilder producerBuilder(RocketMQProperties rocketMQProperties) {
// ... 省略
}
@Bean(SIMPLE_CONSUMER_BUILDER_BEAN_NAME)
@ConditionalOnMissingBean(SimpleConsumerBuilder.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"simple-consumer.endpoints"})
public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQProperties) {
// ... 省略
}
@Bean(destroyMethod = "destroy")
@Conditional(ProducerOrConsumerPropertyCondition.class)
@ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
public RocketMQClientTemplate rocketMQClientTemplate(RocketMQMessageConverter rocketMQMessageConverter) {
// ... 省略
}
}可以發(fā)現(xiàn),在rocketmq-v5-client-spring-boot中,根據(jù) RocketMQ 5.x 在架構上做的改進,使用了 endpoints 來替代傳統(tǒng)的 namesrvAddr,以支持更靈活的網絡拓撲和云原生架構。endpoints 通常指向 RocketMQ 的 Broker 或 Nameserver 地址,用于生產者與 RocketMQ 集群建立連接。endpoints 是一個 URL 或 IP 地址(ip:host)列表(使用;分割)。
??注意:在 RocketMQ 5.x 中,現(xiàn)已默認使用gRPC作為通信協(xié)議,entpoints更建議指向 Proxy 地址,一般默認端口為8081。
因此,現(xiàn)在想要啟用默認的生產者(ProducerBuilder),只需要配置rocketmq.producer.endpoints即可。
想要啟用默認的消費者(SimpleConsumerBuilder),只需要配置rocketmq.simple-consumer.endpoints即可。
而RocketMQClientTemplate則是通過判斷當前應用上下文是否含有ProducerBuilder或SimpleConsumerBuilder Bean對象生成而來。它屬于rocketmq-v5-client-spring-boot模塊下,也就是說它利用了Spring特性,提供了Spring風格的API,方便開發(fā)者通過 Spring 的編程模型來進行消息發(fā)送和接收。
既然是原生態(tài)的簡易使用教程,那么就盡可能在不寫多的代碼的情況下,實現(xiàn)生產環(huán)境中使用MQ。
因此,本次項目就只配置 rocketmq.producer.endpoints 用于啟用默認的生產者,消費者使用Push消費模式,所以配置rocketmq.push-consumer.endpoints。配置如下:
rocketmq:
producer:
endpoints: localhost:8081
push-consumer:
endpoints: localhost:8081topic在代碼中指定,不使用rocketmq.producer.topic和rocketmq.push-consumer.topic配置默認的topic。
tips: 在啟動客戶端服務時,topic需要先創(chuàng)建,否則會啟動報錯。
生產者生產消息
生產消息通過SpringBoot自動裝配的RocketMQClientTemplate對象實現(xiàn),發(fā)送Message對象,示例代碼如下:
@Service
public class MyService {
@Autowired
private RocketMQClientTemplate rocketMQClientTemplate;
public void sendMessage() {
byte[] bytes = "這是一個字符串".getBytes(StandardCharsets.UTF_8);
Message<byte[]> message = MessageBuilder.withPayload(bytes).build();
rocketMQClientTemplate.send("MyTopic", message);
}
}??注意:在 RocketMQ 5.x 中,
Message對象已從自定義對象改為spring-messaging包中的Message對象。一般通過MessageBuilder構建,實例對象類型為GenericMessage。
消費者消費消息
消費者通過@RocketMQMessageListener注解,并實現(xiàn)RocketMQListener接口消費消息,示例代碼如下:
@Service
@RocketMQMessageListener(consumerGroup = "MyTopic-service", topic = "MyTopic", tag = "*")
public class MyService implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
// 從 MessageView 中獲取 ByteBuffer
ByteBuffer byteBuffer = messageView.getBody();
// 轉換 ByteBuffer 為字節(jié)數(shù)組
byte[] body = new byte[byteBuffer.remaining()];
byteBuffer.get(body);
// 處理字節(jié)數(shù)組,例如轉換為字符串
String messageBody = new String(body, StandardCharsets.UTF_8);
System.out.println("消費消息內容:" + messageBody);
return ConsumeResult.SUCCESS;
}
}服務端環(huán)境搭建
下載二進制包
在 Apache RocketMQ 本地部署 RocketMQ 文檔中,可以找到最新的二進制包,位置如下:

如果想保持跟本文相同版本,可以直接點擊鏈接下載RocketMQ 5.3.1版本。
啟動NameServer
#### 啟動namesrv $ nohup sh bin/mqnamesrv & #### 驗證namesrv是否啟動成功 $ tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...
本地模式啟動Broker+Proxy
#### 先啟動broker $ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy & #### 驗證broker是否啟動成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a $ tail -f ~/logs/rocketmqlogs/proxy.log The broker[broker-a,192.169.1.2:10911] boot success...
mqbroker腳本默認會讀取 conf/broker.conf 配置用于Broker服務。在 conf/rmq-proxy.json 中是Proxy服務的配置,通過 --enable-proxy 命令啟動時,需要加上 -pc conf/rmq-proxy.json 參數(shù)指定配置文件位置。
broker.conf的監(jiān)聽端口key為listenPort,管理端口key為brokerAdminPort。
rmq.proxy.json的gRPC請求端口key為grpcServerPort,傳統(tǒng)的消息發(fā)送和接收請求的端口key為remotingListenPort。
- 關閉服務
- 停止Broker:
sh bin/mqshutdown broker - 停止NameServer:
sh bin/mqshutdown namesrv
- 停止Broker:
關于RocketMQ的管理命令可以參考Admin Tool。
links:
RocketMQ 5.x在SpringBoot中的上手使用過程
到此這篇關于springboot 3.x 整合 RocketMQ 5.x的詳細過程的文章就介紹到這了,更多相關springboot 3.x 整合 RocketMQ內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
servlet之ServletContext簡介_動力節(jié)點Java學院整理
這篇文章主要介紹了servlet之ServletContext簡介,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-07-07
Java從零編寫吃貨聯(lián)盟訂餐系統(tǒng)全程講解
這篇文章主要介紹了Java訂餐系統(tǒng),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-12-12
Spring Boot實現(xiàn)數(shù)據(jù)訪問計數(shù)器方案詳解
在Spring Boot項目中,有時需要數(shù)據(jù)訪問計數(shù)器,怎么實現(xiàn)數(shù)據(jù)訪問計數(shù)器呢?下面小編給大家?guī)砹薙pring Boot數(shù)據(jù)訪問計數(shù)器的實現(xiàn)方案,需要的朋友參考下吧2021-08-08
SpringBoot Jpa 自定義查詢實現(xiàn)代碼詳解
這篇文章主要介紹了SpringBoot Jpa 自定義查詢實現(xiàn)代碼詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-02-02

