SpringBoot集成MQ的過程(四種交換機的實例)
?RabbitMQ交換機(Exchange)的核心作用
在RabbitMQ中,?交換機 是消息路由的核心組件,負責接收生產者發(fā)送的消息,并根據規(guī)則(如路由鍵、頭信息等)將消息分發(fā)到對應的隊列中。
不同交換機類型決定了消息的路由邏輯,使用不同的交換機在不同的場景下可以提高消息系統(tǒng)的高可用性。
1. 直連交換機(Direct Exchange)?
?路由機制 ?
- 精確匹配路由鍵(Routing Key)?:消息會被發(fā)送到與
Routing Key?完全匹配 的隊列。 - ?典型場景:一對一或一對多的精確消息分發(fā)。
應用場景 ?
- 任務分發(fā):如訂單處理系統(tǒng),根據訂單類型(如
order.payment、order.shipping)分發(fā)到不同隊列。 - ?日志分類:將不同級別的日志(
log.error、log.info)路由到對應的處理服務。
使用直連交換機實現消息發(fā)送和接收
1.創(chuàng)建一個SpringBoot項目,在yml文件配置如下:
server:
port: 8021
spring:
application:
name: rabbitmq-provider
#配置rabbitMq 服務器
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest2.初始化隊列和交換機,并進行綁定
package com.atguigu.demomq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 功能:
* 作者:程序員ZXY
* 日期:2025/3/8 下午1:55
*/
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue TestDirectQueue(){
return new Queue("TestDirectQueue",true);
}
@Bean
DirectExchange TestDirectExchange(){
return new DirectExchange("TestDirectExchange",true,false);
}
@Bean
Binding bindingDirect(){
return BindingBuilder.bind(TestDirectQueue())
.to(TestDirectExchange())
.with("TestDirectRouting");
}
} 3.實現sendDirectMessage發(fā)送消息請求,由生產者發(fā)送到MQ,TestDirectRouting作為Key,用于精確轉發(fā)。
package com.atguigu.demomq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* 功能:
* 作者:程序員ZXY
* 日期:2025/3/8 下午2:12
*/
@RestController
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "Hello MQ!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//將消息攜帶綁定鍵值:TestDirectRouting 發(fā)送到交換機TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "OK";
}
}4.此時就可以啟動項目發(fā)送消息了,使用PostMan發(fā)送消息,返回OK說明發(fā)送成功

5.進入http://localhost:15672/,可以看到消息發(fā)送成功,我這里是請求了兩次(也就是發(fā)了兩條消息)。

6.接下來寫消費者的消費過程,新創(chuàng)建一個SpringBoot項目,在yml文件配置如下
server:
port: 8022
spring:
application:
name: rabbitmq-provider
#配置rabbitMq 服務器
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest7.消費者配置類,同樣TestDirectRouting用于唯一識別Key
package com.atguigu.demomq2;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 功能:
* 作者:程序員ZXY
* 日期:2025/3/8 下午
*/
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue TestDirectQueue() {
return new Queue("TestDirectQueue",true);
}
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange");
}
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
}8.消費者 接收消息@RabbitListener(queues = "TestDirectQueue")用于監(jiān)聽指定隊列發(fā)送的消息
package com.atguigu.demomq2;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消費者收到消息 : " + testMessage.toString());
}
}9.啟動消費者,成功接收消息

10.查看MQ控制臺,消息成功被消費

2. 扇出交換機(Fanout Exchange)? ?
路由機制(一個交換機轉發(fā)到多個隊列)
- 廣播模式:忽略
Routing Key,將消息發(fā)送到所有綁定的隊列。 - ?典型場景:消息的全局通知或并行處理。
?應用場景
- ?實時通知系統(tǒng):如用戶注冊成功后,同時發(fā)送郵件、短信、更新緩存。
- ?日志廣播:多個服務訂閱同一日志源,各自獨立處理。
使用扇出交換機實現消息發(fā)送和接收
1.扇出交換機配置
package com.atguigu.demomq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutExchangeConfig {
// 定義扇出交換機
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.user.register", true, false);
}
// 定義郵件隊列
@Bean
public Queue emailQueue() {
return new Queue("fanout.user.email", true);
}
// 定義短信隊列
@Bean
public Queue smsQueue() {
return new Queue("fanout.user.sms", true);
}
// 綁定所有隊列到扇出交換機(無需路由鍵)
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
}2.生產者
package com.atguigu.demomq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class FanoutUserService {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendFanoutMessage")
public String sendRegisterBroadcast() {
rabbitTemplate.convertAndSend(
"fanout.user.register",
"", // 扇出交換機忽略路由鍵
"message MQ"
);
return "OK Fan";
}
}3.消費者
package com.atguigu.demomq2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutNotificationConsumer {
@RabbitListener(queues = "fanout.user.email")
public void handleEmail(String message) {
System.out.println("[Email] Received: " + message);
}
@RabbitListener(queues = "fanout.user.sms")
public void handleSms(String message) {
System.out.println("[SMS] Received: " + message);
}
}4.請求并查看消費結果


可以看到一個交換機完成消費兩條消息
?3. 主題交換機(Topic Exchange)?
- ?路由機制 ?模式匹配路由鍵:使用
*(匹配一個單詞)和#(匹配多個單詞)通配符。? - 典型場景:靈活的多條件消息路由。 ?
應用場景
- ?新聞訂閱系統(tǒng):用戶訂閱特定主題(如
news.sports.*、news.tech.#)。? - 設備狀態(tài)監(jiān)控:根據設備類型和區(qū)域路由消息(如
sensor.temperature.room1)。
1.配置主題交換機
package com.atguigu.demomq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicExchangeConfig {
// 定義主題交換機
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic.news", true, false);
}
// 定義體育新聞隊列
@Bean
public Queue sportsQueue() {
return new Queue("topic.news.sports", true);
}
// 定義科技新聞隊列
@Bean
public Queue techQueue() {
return new Queue("topic.news.tech", true);
}
// 綁定體育隊列:匹配 news.sports.*
@Bean
public Binding sportsBinding() {
return BindingBuilder.bind(sportsQueue())
.to(topicExchange())
.with("news.sports.*");
}
// 綁定科技隊列:匹配 news.tech.#
@Bean
public Binding techBinding() {
return BindingBuilder.bind(techQueue())
.to(topicExchange())
.with("news.tech.#");
}
}2.生產者
package com.atguigu.demomq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TopicNewsService {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendTopicMessage1")
public String sendSportsNews() {
rabbitTemplate.convertAndSend(
"topic.news",
"news.sports.football",
"* message:news.sports.football"
);
return "*OK";
}
@GetMapping("/sendTopicMessage2")
public String sendTechNews() {
rabbitTemplate.convertAndSend(
"topic.news",
"news.tech.ai.abc.123456",
"# message:news.tech.ai.abc.123456"
);
return "#OK";
}
}3. 消費者
package com.atguigu.demomq2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicNewsConsumer {
@RabbitListener(queues = "topic.news.sports")
public void handleSports(String message) {
System.out.println("[Sports] Received: " + message);
}
@RabbitListener(queues = "topic.news.tech")
public void handleTech(String message) {
System.out.println("[Tech] Received: " + message);
}
}4.發(fā)送請求


可以看到消息成功消費,第一個為*通配符,第二個為#通配符

?4. 頭交換機(Headers Exchange)?
?路由機制( 我的理解是一種基于 ?多條件組合 的消息路由機制) ?
- ?基于消息頭(Headers)匹配:忽略
Routing Key,通過鍵值對(Headers)匹配隊列綁定的條件。 - ?匹配規(guī)則:
x-match參數設為all(需全部匹配)或any(匹配任意一個)。
?應用場景
- ?復雜路由邏輯:如根據消息的版本號、語言等元數據路由。?
- 多維度過濾:如同時匹配用戶類型(
user_type: vip)和地理位置(region: asia)。
1.頭交換機配置
package com.atguigu.demomq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class HeadersExchangeConfig {
// 定義頭交換機
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headers.user", true, false);
}
// 定義VIP用戶隊列
@Bean
public Queue vipQueue() {
return new Queue("headers.user.vip", true);
}
// 綁定VIP隊列,要求同時匹配 userType=vip 和 region=asia
@Bean
public Binding vipBinding() {
Map<String, Object> headers = new HashMap<>();
headers.put("userType", "vip");
headers.put("region", "asia");
return BindingBuilder.bind(vipQueue())
.to(headersExchange())
.whereAll(headers).match(); // whereAll 表示需全部匹配
}
}2.生產者
package com.atguigu.demomq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HeaderUserVipService {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendHeaderMessage")
public String sendVipMessage() {
MessageProperties props = new MessageProperties();
props.setHeader("userType", "vip");
props.setHeader("region", "asia");
Message msg = new Message("HeaderMessage".getBytes(), props);
rabbitTemplate.send("headers.user", "", msg);
return "OK";
}
}3.消費者
package com.atguigu.demomq2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class HeaderUserVipConsumer {
@RabbitListener(queues = "headers.user.vip")
public void handleVip(Message message) {
String body = new String(message.getBody());
System.out.println("[VIP] Received: " + body);
}
}4.PostMan測試


這里僅消費交換機初始化時滿足所有設定條件的消息,我們可以測試一下不滿足條件時發(fā)送消息

消費者不消費消息

總結

需要代碼自己進行測試的 可以Git自取
git clone https://gitee.com/myselfzxy/mq-producer.git
git clone https://gitee.com/myselfzxy/mq-customer.git
到此這篇關于SpringBoot集成MQ,四種交換機的實例的文章就介紹到這了,更多相關SpringBoot集成MQ內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
在IDEA中配置tomcat并創(chuàng)建tomcat項目的圖文教程
這篇文章主要介紹了在IDEA中配置tomcat并創(chuàng)建tomcat項目的圖文教程,需要的朋友可以參考下2020-07-07
基于Eclipse 的JSP/Servlet的開發(fā)環(huán)境的搭建(圖文)
本文將會詳細地展示如何搭建JSP的開發(fā)環(huán)境。本次教程使用的是最新版的Eclipse 2018-09編輯器和最新版的Apache Tomcat v9.0,步驟詳細,內容詳盡,適合零基礎學者作為學習參考2018-12-12
詳解使用spring aop實現業(yè)務層mysql 讀寫分離
本篇文章主要介紹了使用spring aop實現業(yè)務層mysql 讀寫分離,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-01-01

