SpringBoot之RabbitMQ的使用方法
一 、RabbitMQ的介紹
RabbitMQ是消息中間件的一種,消息中間件即分布式系統(tǒng)中完成消息的發(fā)送和接收的基礎(chǔ)軟件,消息中間件的工作過程可以用生產(chǎn)者消費(fèi)者模型來表示.即,生產(chǎn)者不斷的向消息隊列發(fā)送信息,而消費(fèi)者從消息隊列中消費(fèi)信息.具體過程如下:

從上圖可看出,對于消息隊列來說,生產(chǎn)者、消息隊列、消費(fèi)者是最重要的三個概念,生產(chǎn)者發(fā)消息到消息隊列中去,消費(fèi)者監(jiān)聽指定的消息隊列,并且當(dāng)消息隊列收到消息之后,接收消息隊列傳來的消息,并且給予相應(yīng)的處理。消息隊列常用于分布式系統(tǒng)之間互相信息的傳遞。
對于RabbitMQ來說,除了這三個基本模塊以外,還添加了一個模塊,即交換機(jī)(Exchange)。它使得生產(chǎn)者和消息隊列之間產(chǎn)生了隔離,生產(chǎn)者將消息發(fā)送給交換機(jī),而交換機(jī)則根據(jù)調(diào)度策略把相應(yīng)的消息轉(zhuǎn)發(fā)給對應(yīng)的消息隊列。
交換機(jī)的主要作用是接收相應(yīng)的消息并且綁定到指定的隊列。交換機(jī)有四種類型,分別為Direct、topic、headers、Fanout。
Direct是RabbitMQ默認(rèn)的交換機(jī)模式,也是最簡單的模式。即創(chuàng)建消息隊列的時候,指定一個BindingKey。當(dāng)發(fā)送者發(fā)送消息的時候,指定對應(yīng)的Key。當(dāng)Key和消息隊列的BindingKey一致的時候,消息將會被發(fā)送到該消息隊列中。
topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊列和交換機(jī)的綁定主要是依據(jù)一種模式(通配符+字符串),而當(dāng)發(fā)送消息的時候,只有指定的Key和該模式相匹配的時候,消息才會被發(fā)送到該消息隊列中。
headers也是根據(jù)一個規(guī)則進(jìn)行匹配,在消息隊列和交換機(jī)綁定的時候會指定一組鍵值對規(guī)則,而發(fā)送消息的時候也會指定一組鍵值對規(guī)則,當(dāng)兩組鍵值對規(guī)則相匹配的時候,消息會被發(fā)送到匹配的消息隊列中。
Fanout是路由廣播的形式,將會把消息發(fā)給綁定它的全部隊列,即便設(shè)置了key,也會被忽略。
二 、SpringBoot整合RabbitMQ(Direct模式)
SpringBoot整合RabbitMQ非常簡單,首先還是pom.xml引入依賴。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.properties中配置RabbitMQ相關(guān)的信息,并首先啟動了RabbitMQ實例,并創(chuàng)建兩個queue。

spring.application.name=spirng-boot-rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin
配置Queue(消息隊列),由于采用的是Direct模式,需要在配置Queue的時候指定一個鍵,使其和交換機(jī)綁定。
@Configuration
public class RabbitConfig {
@Bean
public org.springframework.amqp.core.Queue Queue() {
return new org.springframework.amqp.core.Queue("hello");
}
}
接著就可以發(fā)送消息啦。在SpringBoot中,我們使用AmqpTemplate去發(fā)送消息。代碼如下:
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(int index) {
String context = "hello Queue "+index + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
生產(chǎn)者發(fā)送消息之后就需要消費(fèi)者接收消息。這里定義了兩個消息消費(fèi)者,用來模擬生產(chǎn)者與消費(fèi)者一對多的關(guān)系。
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1 : " + hello);
}
}
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver2 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver2 : " + hello);
}
}
在單元測試中模擬發(fā)送消息,批量發(fā)送10條消息,兩個接收者分別接收了5條消息。
@Autowired
private HelloSender helloSender;
@Test
public void hello() throws Exception {
for(int i=0;i<10;i++)
{
helloSender.send(i);
}
}

實際上RabbitMQ還可以支持發(fā)送對象,當(dāng)然由于涉及到序列化和反序列化,該對象要實現(xiàn)Serilizable接口。這里定義了User對象,用來做發(fā)送消息內(nèi)容。
import java.io.Serializable;
public class User implements Serializable{
private String name;
private String pwd;
public String getPwd() {
return pwd;
}
public void setPwd(String pwd) {
this.pwd = pwd;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public User(String name, String pwd) {
this.name = name;
this.pwd = pwd;
}
@Override
public String toString() {
return "User{" +"name='" + name + '\'' +", pwd='" + pwd + '\'' +'}';
}
}
在生產(chǎn)者中發(fā)送User對象。
@Component
public class ModelSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendModel(User user) {
System.out.println("Sender object: " + user.toString());
this.rabbitTemplate.convertAndSend("object", user);
}
}
在消費(fèi)者中接收User對象。
@Component
@RabbitListener(queues = "object")
public class ModelRecevicer {
@RabbitHandler
public void process(User user) {
System.out.println("Receiver object : " + user);
}
}
在單元測試中注入ModelSender 對象,實例化User對象,然后發(fā)送。
@Autowired
private ModelSender modelSender;
@Test
public void model() throws Exception {
User user=new User("abc","123");
modelSender.sendModel(user);
}

三 、SpringBoot整合RabbitMQ(Topic轉(zhuǎn)發(fā)模式)
首先需要在RabbitMQ服務(wù)端創(chuàng)建交換機(jī)topicExchange,并綁定兩個queue:topic.message、topic.messages。

新建TopicRabbitConfig,設(shè)置對應(yīng)的queue與binding。
@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
創(chuàng)建消息生產(chǎn)者,在TopicSender中發(fā)送3個消息。
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, i am message all";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context);
}
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);
}
}
生產(chǎn)者發(fā)送消息,這里創(chuàng)建了兩個接收消息的消費(fèi)者。
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver1 : " + message);
}
}
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver2 : " + message);
}
}
在單元測試中注入TopicSender,利用topicSender 發(fā)送消息。
@Autowired
private TopicSender topicSender;
@Test
public void topicSender() throws Exception {
topicSender.send();
topicSender.send1();
topicSender.send2();
}

從上面的輸出結(jié)果可以看到,Topic Receiver2 匹配到了所有消息,Topic Receiver1只匹配到了1個消息。
四 、SpringBoot整合RabbitMQ(Fanout Exchange形式)
Fanout Exchange形式又叫廣播形式,因此我們發(fā)送到路由器的消息會使得綁定到該路由器的每一個Queue接收到消息。首先需要在RabbitMQ服務(wù)端創(chuàng)建交換機(jī)fanoutExchange,并綁定三個queue:fanout.A、fanout.B、fanout.C。

與Topic類似,新建FanoutRabbitConfig,綁定交換機(jī)和隊列。
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
創(chuàng)建消息生產(chǎn)者,在FanoutSender中發(fā)送消息。
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
System.out.println("FanoutSender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
}
然后創(chuàng)建了3個接收者FanoutReceiverA、FanoutReceiverB、FanoutReceiverC。
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver A : " + message);
}
}
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver B: " + message);
}
}
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver C: " + message);
}
}
在單元測試中注入消息發(fā)送者,發(fā)送消息。
@Autowired
private FanoutSender fanoutSender;
@Test
public void fanoutSender() throws Exception {
fanoutSender.send();
}
從下圖可以看到3個隊列都接收到了消息。

本章節(jié)創(chuàng)建的類比較多,下圖為本章節(jié)的結(jié)構(gòu),也可以直接查看demo源碼了解。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
- RabbitMQ在Spring Boot中的使用步驟
- Springboot RabbitMQ 消息隊列使用示例詳解
- Spring Boot中RabbitMQ自動配置的介紹、原理和使用方法
- 詳解SpringBoot中使用RabbitMQ的RPC功能
- SpringMVC和rabbitmq集成的使用案例
- SpringBoot+RabbitMq具體使用的幾種姿勢
- 詳解Spring Cloud Stream使用延遲消息實現(xiàn)定時任務(wù)(RabbitMQ)
- spring boot使用RabbitMQ實現(xiàn)topic 主題
- Spring3?中?RabbitMQ?的使用與常見場景分析
相關(guān)文章
Mybatis3中方法返回生成的主鍵:XML,@SelectKey,@Options詳解
這篇文章主要介紹了Mybatis3中方法返回生成的主鍵:XML,@SelectKey,@Options,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01
springboot Jpa多數(shù)據(jù)源(不同庫)配置過程
這篇文章主要介紹了springboot Jpa多數(shù)據(jù)源(不同庫)配置過程,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05
Java多線程 Guarded Suspension設(shè)計模式
這篇文章主要介紹了Java多線程 Guarded Suspension設(shè)計模式,Guarded Suspension意為保護(hù)暫停,其核心思想是僅當(dāng)服務(wù)進(jìn)程準(zhǔn)備好時,才提供服務(wù),文章圍繞Java多線程 Guarded Suspension展開內(nèi)容,需要的朋友可以參考一下2021-10-10
SpringBoot 2.6.x整合springfox 3.0報錯問題及解決方案
這篇文章主要介紹了SpringBoot 2.6.x整合springfox 3.0報錯問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01

