SpringBoot整合RabbitMQ消息隊(duì)列的完整步驟
SpringBoot整合RabbitMQ
主要實(shí)現(xiàn)RabbitMQ以下三種消息隊(duì)列:
- 簡(jiǎn)單消息隊(duì)列(演示direct模式)
- 基于RabbitMQ特性的延時(shí)消息隊(duì)列
- 基于RabbitMQ相關(guān)插件的延時(shí)消息隊(duì)列
公共資源
1. 引入pom依賴(lài)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置yml文件
基于上篇《RabbitMQ安裝與配置》實(shí)現(xiàn)的情況下,進(jìn)行基礎(chǔ)配置。
spring:
rabbitmq:
host: 121.5.168.31
port: 5672 # 默認(rèn)可省略
virtual-host: /*** # 虛擬主機(jī)
username: *** # 用戶(hù)名
password: *** # 用戶(hù)密碼
# 開(kāi)啟投遞成功回調(diào) P -> Exchange
publisher-confirm-type: correlated
# 開(kāi)啟投遞消息到隊(duì)列失敗回調(diào) Exchange -> Queue
publisher-returns: true
# 開(kāi)啟手動(dòng)ACK確認(rèn)模式 Queue -> C
listener:
simple:
acknowledge-mode: manual # 代表手動(dòng)ACK確認(rèn)
# 一些基本參數(shù)的設(shè)置
concurrency: 3
prefetch: 15
retry:
enabled: true
max-attempts: 5
max-concurrency: 10
3. 公共Constants類(lèi)
/**
* @author Mr.Horse
* @version 1.0
* @description: {description}
* @date 2021/4/23 15:28
*/
public class Constants {
/**
* 第一個(gè)配置Queue,Exchange,Key(非注解方式)
*/
public final static String HORSE_SIMPLE_QUEUE = "HORSE_SIMPLE_QUEUE";
public final static String HORSE_SIMPLE_EXCHANGE = "HORSE_SIMPLE_EXCHANGE";
public final static String HORSE_SIMPLE_KEY = "HORSE_SIMPLE_KEY";
/**
* 第二個(gè)配置Queue,Exchange,Key(注解方式)
*/
public final static String HORSE_ANNOTATION_QUEUE = "HORSE_ANNOTATION_QUEUE";
public final static String HORSE_ANNOTATION_EXCHANGE = "HORSE_ANNOTATION_EXCHANGE";
public final static String HORSE_ANNOTATION_KEY = "HORSE_ANNOTATION_KEY";
//************************************延時(shí)消息隊(duì)列配置信息**************************
/**
* 延時(shí)隊(duì)列信息配置
*/
public final static String HORSE_DELAY_EXCHANGE = "HORSE_DELAY_EXCHANGE";
public final static String HORSE_DELAY_QUEUE = "HORSE_DELAY_QUEUE";
public final static String HORSE_DELAY_KEY = "HORSE_DELAY_KEY";
/**
* 死信隊(duì)列
*/
public final static String HORSE_DEAD_EXCHANGE = "HORSE_DEAD_EXCHANGE";
public final static String HORSE_DEAD_QUEUE = "HORSE_DEAD_QUEUE";
public final static String HORSE_DEAD_KEY = "HORSE_DEAD_KEY";
//**************************************延時(shí)消息隊(duì)列配置信息(插件版)******************************
/**
* 新延時(shí)隊(duì)列信息配置
*/
public final static String HORSE_PLUGIN_EXCHANGE = "HORSE_PLUGIN_EXCHANGE";
public final static String HORSE_PLUGIN_QUEUE = "HORSE_PLUGIN_QUEUE";
public final static String HORSE_PLUGIN_KEY = "HORSE_PLUGIN_KEY";
}
簡(jiǎn)單消息隊(duì)列(direct模式)
4. RabbitTemplate模板配置
主要定義消息投遞Exchange成功回調(diào)函數(shù)和消息從Exchange投遞到消息隊(duì)列失敗的回調(diào)函數(shù)。
package com.topsun.rabbit;
import com.sun.org.apache.xpath.internal.operations.Bool;
import com.topsun.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Mr.Horse
* @version 1.0
* @description: {description}
* @date 2021/4/23 14:17
*/
@Configuration
public class RabbitConfig {
private static Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 觸發(fā)setReturnCallback回調(diào)必須設(shè)置mandatory=true,否則Exchange沒(méi)有找到Queue就會(huì)丟棄掉消息, 而不會(huì)觸發(fā)回調(diào)
rabbitTemplate.setMandatory(Boolean.TRUE);
// 設(shè)置序列化機(jī)制
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 消息由投遞到Exchange中時(shí)觸發(fā)的回調(diào)
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
logger.info("消息發(fā)送到Exchange情況反饋:唯一標(biāo)識(shí):correlationData={},消息確認(rèn):ack={},原因:cause={}",
correlationData, ack, cause)
);
// 消息由Exchange發(fā)送到Queue時(shí)失敗觸發(fā)的回調(diào)
rabbitTemplate.setReturnsCallback((returnedMessage) -> {
// 如果是插件形式實(shí)現(xiàn)的延時(shí)隊(duì)列,則直接返回
// 原因: 因?yàn)榘l(fā)送方確實(shí)沒(méi)有投遞到隊(duì)列上,只是在交換器上暫存,等過(guò)期時(shí)間到了 才會(huì)發(fā)往隊(duì)列,從而實(shí)現(xiàn)延時(shí)隊(duì)列的操作
if (Constants.HORSE_PLUGIN_EXCHANGE.equals(returnedMessage.getExchange())) {
return;
}
logger.warn("消息由Exchange發(fā)送到Queue時(shí)失敗:message={},replyCode={},replyText={},exchange={},rountingKey={}",
returnedMessage.getMessage(), returnedMessage.getReplyText(), returnedMessage.getReplyText(),
returnedMessage.getExchange(), returnedMessage.getRoutingKey());
});
return rabbitTemplate;
}
//*******************************************直接配置綁定關(guān)系*****************************************
/**
* 聲明隊(duì)列
*
* @return
*/
@Bean
public Queue horseQueue() {
return new Queue(Constants.HORSE_SIMPLE_QUEUE, Boolean.TRUE);
}
/**
* 聲明指定模式交換機(jī)
*
* @return
*/
@Bean
public DirectExchange horseExchange() {
return new DirectExchange(Constants.HORSE_SIMPLE_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
}
/**
* 綁定交換機(jī),隊(duì)列,路由Key
*
* @return
*/
@Bean
public Binding horseBinding() {
return BindingBuilder.bind(horseQueue()).to(horseExchange()).with(Constants.HORSE_SIMPLE_KEY);
}
}
5. 定義消息監(jiān)聽(tīng)器
基于 @RabbitListenerzi注解,實(shí)現(xiàn)自定義消息監(jiān)聽(tīng)器。主要有兩種實(shí)現(xiàn)方式:
- 如果在配置類(lèi)中聲明了Queue、Excehange以及他們直接的綁定,這里直接指定隊(duì)列進(jìn)行消息監(jiān)聽(tīng)
- 如果前面什么也沒(méi)做,這里可以直接用注解的方式進(jìn)行綁定實(shí)現(xiàn)消息監(jiān)聽(tīng)
package com.topsun.rabbit;
import com.rabbitmq.client.Channel;
import com.topsun.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author Mr.Horse
* @version 1.0
* @description: {description}
* @date 2021/4/23 14:58
*/
@Component
public class MsgListener {
private static Logger logger = LoggerFactory.getLogger(MsgListener.class);
/**
* 配置類(lèi)中已經(jīng)完成綁定,這里直接根據(jù)隊(duì)列值接收
*
* @param message
* @param channel
* @param msg
*/
@RabbitListenerzi(queues = Constants.HORSE_SIMPLE_QUEUE)
public void customListener(Message message, Channel channel, String msg) {
// 獲取每條消息唯一標(biāo)識(shí)(用于手動(dòng)ACK確認(rèn))
long tag = message.getMessageProperties().getDeliveryTag();
try {
logger.info(" ==> customListener接收" + msg);
// 手動(dòng)ACK確認(rèn)
channel.basicAck(tag, false);
} catch (IOException e) {
logger.error(" ==> 消息接收失敗: {}", tag);
}
}
/**
* 根據(jù)注解的形式進(jìn)行綁定接收
*
* @param message
* @param channel
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = Constants.HORSE_ANNOTATION_QUEUE, durable = "true"),
exchange = @Exchange(value = Constants.HORSE_ANNOTATION_EXCHANGE, ignoreDeclarationExceptions = "true"),
key = {Constants.HORSE_ANNOTATION_KEY}
))
public void annotationListener(Message message, Channel channel, String msg) {
// 獲取每條消息唯一標(biāo)識(shí)(用于手動(dòng)ACK確認(rèn))
long tag = message.getMessageProperties().getDeliveryTag();
try {
logger.info(" ==> annotationListener接收" + msg);
// 手動(dòng)ACK確認(rèn)
channel.basicAck(tag, false);
} catch (IOException e) {
logger.error(" ==> 消息接收失敗: {}", tag);
}
}
}
6. 測(cè)試接口
這里發(fā)送100條消息:
- 奇數(shù)條到非注解方式的消息監(jiān)聽(tīng)器
- 偶數(shù)條到注解式消息監(jiān)聽(tīng)器
@GetMapping("/rabbit")
public void sendMsg() {
for (int i = 1; i <= 100; i++) {
String msg = "第" + i + "條消息";
logger.info("==> 發(fā)送" + msg);
if (i % 2 == 1) {
rabbitTemplate.convertAndSend(Constants.HORSE_SIMPLE_EXCHANGE, Constants.HORSE_SIMPLE_KEY, msg, new CorrelationData(String.valueOf(i)));
} else {
rabbitTemplate.convertAndSend(Constants.HORSE_ANNOTATION_EXCHANGE, Constants.HORSE_ANNOTATION_KEY, msg, new CorrelationData(String.valueOf(i)));
}
}
}
結(jié)果:自行測(cè)試過(guò),非常成功:smile::smile::smile:
延時(shí)消息隊(duì)列
原理:生產(chǎn)者生產(chǎn)一條延時(shí)消息,根據(jù)需要延時(shí)時(shí)間的不同,利用不同的routingkey將消息路由到不同的延時(shí)隊(duì)列,每個(gè)隊(duì)列都設(shè)置了不同的TTL屬性,并綁定在同一個(gè)死信交換機(jī)中,消息過(guò)期后,根據(jù)routingkey的不同,又會(huì)被路由到不同的死信隊(duì)列中,消費(fèi)者只需要監(jiān)聽(tīng)對(duì)應(yīng)的死信隊(duì)列進(jìn)行處理即可。
7. 配置綁定相關(guān)信息
/**
* @author Mr.Horse
* @version 1.0
* @description: {description}
* @date 2021/4/24 14:22
*/
@Configuration
public class DelayRabbitConfig {
private static Logger logger = LoggerFactory.getLogger(DelayRabbitConfig.class);
/**
* 聲明延時(shí)隊(duì)列交換機(jī)
*
* @return
*/
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(Constants.HORSE_DELAY_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
}
/**
* 聲明死信隊(duì)列交換機(jī)
*
* @return
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(Constants.HORSE_DEAD_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
}
/**
* 聲明延時(shí)隊(duì)列 延時(shí)10s(單位:ms),并將延時(shí)隊(duì)列綁定到對(duì)應(yīng)的死信交換機(jī)和路由Key
*
* @return
*/
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>(3);
// x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", Constants.HORSE_DEAD_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key
args.put("x-dead-letter-routing-key", Constants.HORSE_DEAD_KEY);
// x-message-ttl 聲明隊(duì)列的TTL(過(guò)期時(shí)間)
// 可以在這里直接寫(xiě)死,也可以進(jìn)行動(dòng)態(tài)的設(shè)置(推薦動(dòng)態(tài)設(shè)置)
// args.put("x-message-ttl", 10000);
return QueueBuilder.durable(Constants.HORSE_DELAY_QUEUE).withArguments(args).build();
}
/**
* 聲明死信隊(duì)列
*
* @return
*/
@Bean
public Queue deadQueue() {
return new Queue(Constants.HORSE_DEAD_QUEUE, Boolean.TRUE);
}
/**
* 延時(shí)隊(duì)列綁定管理
*
* @return
*/
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(Constants.HORSE_DELAY_KEY);
}
/**
* 死信隊(duì)列綁定管理
*
* @return
*/
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(Constants.HORSE_DEAD_KEY);
}
//**********************************延時(shí)消息隊(duì)列配置信息(插件版)************************************
@Bean
public Queue pluginQueue() {
return new Queue(Constants.HORSE_PLUGIN_QUEUE);
}
/**
* 設(shè)置延時(shí)隊(duì)列的交換機(jī),必須是 CustomExchange 類(lèi)型交換機(jī)
* 參數(shù)必須,不能改變
* @return
*/
@Bean
public CustomExchange customPluginExchange() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-delayed-type", "direct");
return new CustomExchange(Constants.HORSE_PLUGIN_EXCHANGE, "x-delayed-message", Boolean.TRUE, Boolean.FALSE, args);
}
@Bean
public Binding pluginBinding() {
return BindingBuilder.bind(pluginQueue()).to(customPluginExchange()).with(Constants.HORSE_PLUGIN_KEY).noargs();
}
}
8. 定義延時(shí)監(jiān)聽(tīng)器
/**
* @author Mr.Horse
* @version 1.0
* @description: {description}
* @date 2021/4/24 14:51
*/
@Component
public class DelayMsgListener {
private static Logger logger = LoggerFactory.getLogger(DelayMsgListener.class);
/**
* 監(jiān)聽(tīng)死信隊(duì)列
*
* @param message
* @param channel
* @param msg
*/
@RabbitListener(queues = Constants.HORSE_DEAD_QUEUE)
public void consumeDeadListener(Message message, Channel channel, String msg) {
long tag = message.getMessageProperties().getDeliveryTag();
try {
logger.info(" ==> consumeDeadListener接收" + msg);
// 手動(dòng)ACK確認(rèn)
channel.basicAck(tag, false);
} catch (IOException e) {
logger.error(" ==> 消息接收失敗: {}", tag);
}
}
/**
* 監(jiān)聽(tīng)延時(shí)隊(duì)列(插件版)
*
* @param message
* @param channel
* @param msg
*/
@RabbitListener(queues = Constants.HORSE_PLUGIN_QUEUE)
public void consumePluginListener(Message message, Channel channel, String msg) {
long tag = message.getMessageProperties().getDeliveryTag();
try {
logger.info(" ==> consumePluginListener" + msg);
// 手動(dòng)ACK確認(rèn)
channel.basicAck(tag, false);
} catch (IOException e) {
logger.error(" ==> 消息接收失敗: {}", tag);
}
}
}
9. 測(cè)試接口
// 基于特性的延時(shí)隊(duì)列
@GetMapping("/delay/rabbit")
public void delayMsg(@RequestParam("expire") Long expire) {
for (int i = 1; i <= 10; i++) {
String msg = "第" + i + "條消息";
logger.info("==> 發(fā)送" + msg);
// 這里可以動(dòng)態(tài)的設(shè)置過(guò)期時(shí)間
rabbitTemplate.convertAndSend(Constants.HORSE_DELAY_EXCHANGE, Constants.HORSE_DELAY_KEY, msg,
message -> {
message.getMessageProperties().setExpiration(String.valueOf(expire));
return message;
},
new CorrelationData(String.valueOf(i)));
}
}
// 基于插件的延時(shí)隊(duì)列
@GetMapping("/delay/plugin")
public void delayPluginMsg(@RequestParam("expire") Integer expire) {
for (int i = 1; i <= 10; i++) {
String msg = "第" + i + "條消息";
logger.info("==> 發(fā)送" + msg);
// 動(dòng)態(tài)設(shè)置過(guò)期時(shí)間
rabbitTemplate.convertAndSend(Constants.HORSE_PLUGIN_EXCHANGE, Constants.HORSE_PLUGIN_KEY, msg, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(expire);
return message;
}, new CorrelationData(String.valueOf(i)));
}
}
結(jié)果:你懂的:scream_cat::scream_cat::scream_cat:
RabbitMQ的基礎(chǔ)使用演示到此結(jié)束。
總結(jié)
到此這篇關(guān)于SpringBoot整合RabbitMQ消息隊(duì)列的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
web.xml詳解_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章給大家詳細(xì)介紹了web.xml的相關(guān)知識(shí),需要的朋友可以參考下2017-07-07
MyBatis傳入List集合查詢(xún)數(shù)據(jù)問(wèn)題
這篇文章主要介紹了MyBatis傳入List集合查詢(xún)數(shù)據(jù)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02
使用maven創(chuàng)建普通項(xiàng)目命令行程序詳解
大部分使用maven創(chuàng)建的是web項(xiàng)目,這里使用maven創(chuàng)建一個(gè)命令行程序,目的是讓大家了解maven特點(diǎn)和使用方式,有需要的朋友可以借鑒參考下2021-10-10
Java后端調(diào)用微信支付和支付寶支付的詳細(xì)步驟
這篇文章主要介紹了Java后端如何調(diào)用微信支付和支付寶支付,涵蓋了基本概念、配置步驟、代碼示例以及注意事項(xiàng),文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2025-04-04
Java啟動(dòng)Tomcat的實(shí)現(xiàn)步驟
本文主要介紹了Java啟動(dòng)Tomcat的實(shí)現(xiàn)步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05
SpringBoot使用SchedulingConfigurer實(shí)現(xiàn)多個(gè)定時(shí)任務(wù)多機(jī)器部署問(wèn)題(推薦)
這篇文章主要介紹了SpringBoot使用SchedulingConfigurer實(shí)現(xiàn)多個(gè)定時(shí)任務(wù)多機(jī)器部署問(wèn)題,定時(shí)任務(wù)多機(jī)器部署解決方案,方式一拆分,單獨(dú)拆分出來(lái),單獨(dú)跑一個(gè)應(yīng)用,方式二是基于aop攔截處理(搶占執(zhí)行),只要有一個(gè)執(zhí)行,其它都不執(zhí)行,需要的朋友可以參考下2023-01-01

