SpringCloud?Stream?整合RabbitMQ的基本步驟
本篇簡(jiǎn)單介紹SpringCloud Stream 整合RabbitMQ基本步驟:
- 引入SpringCloud
- 引入SpringCloud Stream相關(guān)依賴
- 定義綁定接口: 消息生產(chǎn)者(Output…Binding) 、消息消費(fèi)者(Input…Binding)
- @EnableBinding 在對(duì)應(yīng)類上進(jìn)行定義
- @StreamListener 在對(duì)應(yīng)方法上創(chuàng)建監(jiān)聽用來(lái)消費(fèi)消息
- 調(diào)用output的send()方法生產(chǎn)消息
一、項(xiàng)目介紹
演示SpringCloud Stream 整合RabbitMQ,項(xiàng)目可以在一個(gè)工程里完成,本次建立了一個(gè)工程mq-service,其中包含三個(gè)Module:
- mq-service-base :基礎(chǔ)模塊(包含了共用依賴、共用變量)
- mq-service-producer :生產(chǎn)者
- mq-service-consumer :消費(fèi)者
注: 完全可以在一個(gè)工程里實(shí)現(xiàn),這里為了區(qū)分,并為了后續(xù)單獨(dú)啟動(dòng)或停止生產(chǎn)者或消費(fèi)者做實(shí)驗(yàn),也為了適應(yīng)實(shí)際應(yīng)用項(xiàng)目,所以創(chuàng)建了不同Module
(1)版本
- SpringBoot : 2.0.6.RELEASE
- SpringCloud : Finchley.SR2
- RabbitMQ : 3.8.1
(2)項(xiàng)目整體結(jié)構(gòu)

(3)基礎(chǔ)模塊
1)pom.xml
這里作為公共模塊引入SpringCloud、Spring Cloud Stream等,其中也再此引入fastjson、lombok等工具依賴
(完整代碼見文章最下面)
其中Spring Cloud Stream如下:
<!-- Spring Cloud Stream, 用于MQ消息發(fā)送--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
2) model
定義共用的變量,如CollectionRequest.java
二、生產(chǎn)者
(1)結(jié)構(gòu)

(2)pom.xml
導(dǎo)入base的依賴即可,因?yàn)橄嚓P(guān)共用依賴在base中已經(jīng)引入
<dependency> <groupId>com.zrk</groupId> <artifactId>mq-service-base</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
(3)定義綁定(接口)
OutputMessageBinding.java
public interface OutputMessageBinding {
/** Topic 名稱*/
String OUTPUT = "message-center-out";
@Output(OUTPUT)
MessageChannel output();
}(4)添加配置
# rabbitmq連接信息 spring.rabbitmq.addresses=192.168.1.125 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.cloud.stream.bindings.message-center-out.destination=message-center spring.cloud.stream.rabbit.bindings.message-center-out.consumer.exchangeType=fanout
(5) 調(diào)用方法
CollectionServiceImpl.java
@Service
@EnableBinding(OutputMessageBinding.class)
public class CollectionServiceImpl implements CollectionService{
@Resource
private OutputMessageBinding outputMessageBinding;
/**
* @param schoolName
* @param content
*/
@Override
public void getCollection(String schoolName, String content) {
CollectionRequest request = new CollectionRequest();
request.setSchoolName(schoolName);
request.setContent(content);
outputMessageBinding.output().send(MessageBuilder.withPayload(request).build());
}
}注: 主要是兩點(diǎn)
- @EnableBinding 定義
- outputMessageBinding.output().send(MessageBuilder.withPayload(request).build()); 生產(chǎn)消息
三、消費(fèi)者
(1)結(jié)構(gòu)

(2)pom.xml
導(dǎo)入base的依賴即可,因?yàn)橄嚓P(guān)共用依賴在base中已經(jīng)引入
<dependency> <groupId>com.zrk</groupId> <artifactId>mq-service-base</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
(3)定義綁定(接口)
InputMessageBinding.java
public interface InputMessageBinding {
String INPUT = "message-center-input";
@Input(INPUT)
SubscribableChannel input();
}注: 消費(fèi)者這里與生產(chǎn)者不同,用的是SubscribableChannel ,而生產(chǎn)者用的是MessageChannel
(4)添加配置
# rabbitmq連接信息
spring.rabbitmq.addresses=192.168.1.125
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.cloud.stream.bindings.message-center-input.destination=message-center
spring.cloud.stream.bindings.message-center-input.group=${spring.application.name}(5) 調(diào)用方法
CollectionReceiver.java
@Slf4j
@EnableBinding(InputMessageBinding.class)
public class CollectionReceiver {
@StreamListener(InputMessageBinding.INPUT)
public void handle(String value){
log.info("[消息] 接收到發(fā)送消息MQ: {}", value);
CollectionRequest request = JSON.parseObject(value, CollectionRequest.class);
log.info("處理收集信息:" + request.toString());
}
}注: 主要是兩點(diǎn)
- @EnableBinding 定義
- @StreamListener 注冊(cè)監(jiān)聽
至此,生產(chǎn)者與消費(fèi)者都創(chuàng)建完成,分別啟動(dòng)兩個(gè)項(xiàng)目,并調(diào)用生產(chǎn)者接口進(jìn)行驗(yàn)證:
四、驗(yàn)證 在postman 訪問(wèn)生產(chǎn)者接口:
localhost:30110/collection/getCollectionschoolName=‘zrk’&content=‘send message to rabbitmq’

觀察消費(fèi)者日志:

查看rabbitmq首頁(yè)


則證明已經(jīng)整合成功,接下來(lái)將研究一下更多的配置與用法。
如果有需要,可以參考項(xiàng)目完整代碼:https://github.com/zrk333/mq-service
到此這篇關(guān)于SpringCloud Stream 整合RabbitMQ的文章就介紹到這了,更多相關(guān)SpringCloud Stream 整合RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于Spring MVC框架中攔截器Interceptor的使用解讀
這篇文章主要介紹了關(guān)于Spring MVC框架中攔截器Interceptor的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07
java實(shí)現(xiàn)簡(jiǎn)易外賣訂餐系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)簡(jiǎn)易外賣訂餐系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-10-10
關(guān)于kafka消費(fèi)不到遠(yuǎn)程bootstrap-server?數(shù)據(jù)的問(wèn)題
很多朋友遇到kafka消費(fèi)不到遠(yuǎn)程bootstrap-server?數(shù)據(jù)的問(wèn)題,怎么解決這個(gè)問(wèn)題,很多朋友不知所措,下面小編給大家?guī)?lái)了關(guān)于kafka消費(fèi)不到遠(yuǎn)程bootstrap-server?數(shù)據(jù)的問(wèn)題及解決方法,感興趣的朋友跟隨小編一起看看吧2021-11-11
服務(wù)器實(shí)現(xiàn)Java遠(yuǎn)程訪問(wèn)Linux服務(wù)器方式(JSch)
文章介紹了如何使用Java遠(yuǎn)程訪問(wèn)Linux服務(wù)器,主要包括建立SSH連接、使用JSch庫(kù)執(zhí)行命令、解析返回值以及關(guān)閉連接的步驟2024-11-11
Java基礎(chǔ)鞏固小項(xiàng)目點(diǎn)菜系統(tǒng)的實(shí)現(xiàn)
這篇文章主要介紹了一個(gè)Java小項(xiàng)目點(diǎn)菜系統(tǒng)的實(shí)現(xiàn),主要是用的集合,適合正在學(xué)習(xí)Java的朋友拿來(lái)實(shí)戰(zhàn)練手,感興趣的朋友快來(lái)看看吧2022-03-03
hibernate一對(duì)多關(guān)聯(lián)映射學(xué)習(xí)小結(jié)
這篇文章主要介紹了hibernate一對(duì)多關(guān)聯(lián)映射學(xué)習(xí)小結(jié),需要的朋友可以參考下2017-09-09
mybatis簡(jiǎn)介與配置_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了mybatis簡(jiǎn)介與配置,介紹了MyBatis+Spring+MySql簡(jiǎn)單配置,有興趣的可以了解一下2017-09-09
OpenFeign超時(shí)時(shí)間設(shè)置不生效問(wèn)題排查記錄
文章主要講述了在升級(jí)Spring Boot 3后,發(fā)現(xiàn)配置文件中的OpenFeign超時(shí)時(shí)間設(shè)置不生效的問(wèn)題,通過(guò)查看FeignClientFactoryBean類和FeignClientProperties類的源碼,發(fā)現(xiàn)配置讀取的方式發(fā)生了變化,從而導(dǎo)致超時(shí)時(shí)間設(shè)置不生效2024-11-11
使用MyBatis-Generator如何自動(dòng)生成映射文件
這篇文章主要介紹了使用MyBatis-Generator如何自動(dòng)生成映射文件,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02

