Spring整合消息隊列RabbitMQ流程
搭建生產(chǎn)者工程
創(chuàng)建工程


添加依賴
修改pom.xml文件內(nèi)容為如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId>
<artifactId>spring-rabbitmq-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
</project>
配置整合
創(chuàng)建spring-rabbitmq-producer\src\main\resources\properties\rabbitmq.properties連接參數(shù)等配置文件;
rabbitmq.host=192.168.12.135
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast
創(chuàng)建 spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml 整合配置文件;
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載配置文件-->
<context:property-placeholder location="classpath:properties/rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--定義管理交換機、隊列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定義持久化隊列,不存在則自動創(chuàng)建;不綁定到交換機則綁定到默認交換機
默認交換機類型為direct,名字為:"",路由鍵為隊列的名稱
id:bean的名稱
name : queue的名稱
auto-declare 自動創(chuàng)建
auto-delete 自動刪除,最后一個消費者和該隊列斷開連接后自動刪除
duable 是否持久化
-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~廣播;所有隊列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定義廣播交換機中的持久化隊列,不存在則自動創(chuàng)建-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<!--定義廣播交換機中的持久化隊列,不存在則自動創(chuàng)建-->
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<!--定義廣播類型交換機;并綁定上述兩個隊列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"/>
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!--<rabbit:direct-exchange name=“aa”">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1 key="xx"/>
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:direct-exchange>-->
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一個單詞,#匹配多個單詞 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定義廣播交換機中的持久化隊列,不存在則自動創(chuàng)建-->
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<!--定義廣播交換機中的持久化隊列,不存在則自動創(chuàng)建-->
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<!--定義廣播交換機中的持久化隊列,不存在則自動創(chuàng)建-->
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定義rabbitTemplate對象操作可以在代碼中方便發(fā)送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>發(fā)送消息
創(chuàng)建測試文件 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 只發(fā)隊列消息
* 默認交換機類型為 direct
* 交換機的名稱為空,路由鍵為隊列的名稱
*/
@Test
public void queueTest(){
//路由鍵與隊列同名
rabbitTemplate.convertAndSend("spring_queue", "只發(fā)隊列spring_queue的消息。");
}
/**
* 發(fā)送廣播
* 交換機類型為 fanout
* 綁定到該交換機的所有隊列都能夠收到消息
*/
@Test
public void fanoutTest(){
/**
* 參數(shù)1:交換機名稱
* 參數(shù)2:路由鍵名(廣播設(shè)置為空)
* 參數(shù)3:發(fā)送的消息內(nèi)容
*/
rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "發(fā)送到spring_fanout_exchange交換機的廣播消息");
}
/**
* 通配符
* 交換機類型為 topic
* 匹配路由鍵的通配符,*表示一個單詞,#表示多個單詞
* 綁定到該交換機的匹配隊列能夠收到對應(yīng)消息
*/
@Test
public void topicTest(){
/**
* 參數(shù)1:交換機名稱
* 參數(shù)2:路由鍵名
* 參數(shù)3:發(fā)送的消息內(nèi)容
*/
rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj", "發(fā)送到spring_topic_exchange交換機heima.bj的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj.1", "發(fā)送到spring_topic_exchange交換機heima.bj.1的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj.2", "發(fā)送到spring_topic_exchange交換機heima.bj.2的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "itcast.cn", "發(fā)送到spring_topic_exchange交換機itcast.cn的消息");
}
}搭建消費者工程
創(chuàng)建工程
添加依賴
修改pom.xml文件內(nèi)容為如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId>
<artifactId>spring-rabbitmq-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
</dependencies>
</project>
配置整合
創(chuàng)建spring-rabbitmq-consumer\src\main\resources\properties\rabbitmq.properties連接參數(shù)等配置文件;
rabbitmq.host=192.168.12.135
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast
創(chuàng)建 spring-rabbitmq-consumer\src\main\resources\spring\spring-rabbitmq.xml 整合配置文件;
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載配置文件-->
<context:property-placeholder location="classpath:properties/rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<bean id="springQueueListener" class="com.itheima.rabbitmq.listener.SpringQueueListener"/>
<bean id="fanoutListener1" class="com.itheima.rabbitmq.listener.FanoutListener1"/>
<bean id="fanoutListener2" class="com.itheima.rabbitmq.listener.FanoutListener2"/>
<bean id="topicListenerStar" class="com.itheima.rabbitmq.listener.TopicListenerStar"/>
<bean id="topicListenerWell" class="com.itheima.rabbitmq.listener.TopicListenerWell"/>
<bean id="topicListenerWell2" class="com.itheima.rabbitmq.listener.TopicListenerWell2"/>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
<rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
<rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
<rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>
</rabbit:listener-container>
</beans>消息監(jiān)聽器
1)隊列監(jiān)聽器
創(chuàng)建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\SpringQueueListener.java
public class SpringQueueListener implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("接收路由名稱為:%s,路由鍵為:%s,隊列名為:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2)廣播監(jiān)聽器1
創(chuàng)建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\FanoutListener1.java
public class FanoutListener1 implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("廣播監(jiān)聽器1:接收路由名稱為:%s,路由鍵為:%s,隊列名為:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3)廣播監(jiān)聽器2
創(chuàng)建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\FanoutListener2.java
public class FanoutListener2 implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("廣播監(jiān)聽器2:接收路由名稱為:%s,路由鍵為:%s,隊列名為:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4)星號通配符監(jiān)聽器
創(chuàng)建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\TopicListenerStar.java
public class TopicListenerStar implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配符*監(jiān)聽器:接收路由名稱為:%s,路由鍵為:%s,隊列名為:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
5)井號通配符監(jiān)聽器
創(chuàng)建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\TopicListenerWell.java
public class TopicListenerWell implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配符#監(jiān)聽器:接收路由名稱為:%s,路由鍵為:%s,隊列名為:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
6)井號通配符監(jiān)聽器2
創(chuàng)建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\TopicListenerWell2.java
public class TopicListenerWell2 implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配符#監(jiān)聽器2:接收路由名稱為:%s,路由鍵為:%s,隊列名為:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
到此這篇關(guān)于Spring整合消息隊列RabbitMQ流程的文章就介紹到這了,更多相關(guān)Spring整合RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot整合mybatisplus時,使用條件構(gòu)造器排序報錯問題及解決
這篇文章主要介紹了Springboot整合mybatisplus時,使用條件構(gòu)造器排序報錯問題及解決,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-04-04
eclipse導(dǎo)入工程報錯問題項目或者文件有紅叉的解決方案
這篇文章主要介紹了eclipse導(dǎo)入工程報錯問題項目或者文件有紅叉的解決方案,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-05-05
springboot整合minio實現(xiàn)文件上傳與下載且支持鏈接永久訪問
本文主要介紹了springboot整合minio實現(xiàn)文件上傳與下載且支持鏈接永久訪問,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-01-01
Spring Schedule Task動態(tài)改寫Cron配置方式
這篇文章主要介紹了Spring Schedule Task動態(tài)改寫Cron配置方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11

