詳解SpringBoot集成消息隊列的案例應(yīng)用
背景
最近在對公司開發(fā)框架進行優(yōu)化,框架內(nèi)涉及到多處入庫的日志記錄,例如登錄日志/操作日志/訪問日志/業(yè)務(wù)執(zhí)行日志,集成在業(yè)務(wù)代碼中耦合度較高且占用業(yè)務(wù)操作執(zhí)行時間,所以準備集成相關(guān)消息隊列進行代碼解耦
方案規(guī)劃
現(xiàn)有的成熟消息隊列組件非常多,例如RabbitMQ,ActiveMQ,Kafka等,考慮到業(yè)務(wù)并發(fā)量不高且框架已經(jīng)應(yīng)用于多個項目平穩(wěn)運行,準備提供基于Redis的消息隊列和集成ActiveMQ兩種方案,Redis消息隊列的好處是無需額外安裝部署存量項目可平穩(wěn)過度但消息無法持久化可能丟失,ActiveMQ解決方案成熟可以保證消息持久化但是需要實施人員額外掌握操作部署
統(tǒng)一設(shè)計
增加自定義配置指定消息隊列方式
system: #消息隊列方式 redis/activemq messageChannel: redis
定義消息傳輸統(tǒng)一模型
public class MessageModel {
private Class<? extends IMessageReceiver> handleClazz;
private String bodyContent;
private Class bodyClass;
private HashMap extraParam;
public MessageModel(){
extraParam = new HashMap();
}
public Class<? extends IMessageReceiver> getHandleClazz() {
return handleClazz;
}
public void setHandleClazz(Class<? extends IMessageReceiver> handleClazz) {
this.handleClazz = handleClazz;
}
public HashMap getExtraParam() {
return extraParam;
}
public void setExtraParam(HashMap extraParam) {
this.extraParam = extraParam;
}
public String getBodyContent() {
return bodyContent;
}
public void setBodyContent(String bodyContent) {
this.bodyContent = bodyContent;
}
public Class getBodyClass() {
return bodyClass;
}
public void setBodyClass(Class bodyClass) {
this.bodyClass = bodyClass;
}
}定義標準消息處理接口
public interface IMessageReceiver {
void handleMessage(Object bodyObject, HashMap extraParam);
}
定義統(tǒng)一對外發(fā)送消息工具類
@Component
public class MessageUtil {
@Autowired
private SystemConfig systemConfig;
@Autowired
private RedisUtil redisUtil;
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
public void sendMessage(Object messageBody, Class<? extends IMessageReceiver> handleClass, HashMap<String,Object> extraParam) {
MessageModel messageModel = new MessageModel();
messageModel.setHandleClazz(handleClass);
messageModel.setBodyClass(messageBody.getClass());
messageModel.setBodyContent(JSON.toJSONString(messageBody));
if (extraParam != null) {
for (String key:extraParam.keySet()) {
messageModel.getExtraParam().put(key,extraParam.get(key));
}
}
if(systemConfig.getMessageChannel().equals("redis")){
redisUtil.sendMessage("message", JSON.toJSON(messageModel));
}else{
jmsMessagingTemplate.convertAndSend("message",JSON.toJSONString(messageModel));
}
}
}
集成Redis消息隊列
pom配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
連接配置
spring:
redis:
host: localhost
port: 6379
password:
操作工具類
@Autowired
private RedisTemplate redisTemplate;
public void sendMessage(String channel, Object message) {
redisTemplate.convertAndSend(channel, message);
}
消息處理
@Component
@ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true)
public class RedisMessageReceiver {
public void receiveMessage(String message) {
MessageModel messageModel = JSON.parseObject(message, MessageModel.class);
IMessageReceiver receiver = SpringBootBeanUtil.getBean(messageModel.getHandleClazz());
receiver.handleMessage(JSON.parseObject(messageModel.getBodyContent(), messageModel.getBodyClass()), messageModel.getExtraParam());
}
}配置注冊
@Configuration
public class MessageCenter {
@Bean
@ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true)
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多個 messageListener,配置不同的交換機
container.addMessageListener(listenerAdapter, new PatternTopic("message"));
return container;
}
/**
* 消息監(jiān)聽器適配器,綁定消息處理器,利用反射技術(shù)調(diào)用消息處理器的業(yè)務(wù)方法
*
* @param receiver
* @return
*/
@Bean
@ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true)
MessageListenerAdapter listenerAdapter(RedisMessageReceiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}集成ActiveMQ消息隊列
pom配置
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>
注意:jdk1.8對應(yīng)版本5.15.0
連接配置
spring:
activemq:
broker-url: tcp://127.0.0.1:61616 #MQ服務(wù)器地址
user: admin
password: admin
pool:
enabled: true
消息處理
@Component
@ConditionalOnProperty(name = "system.messageChannel", havingValue = "activemq", matchIfMissing = false)
public class ActiveMQMessageReceiver {
@JmsListener(destination = "message", containerFactory = "customQueueListener")
public void handleMessage(String message) {
MessageModel messageModel = JSON.parseObject(message, MessageModel.class);
IMessageReceiver receiver = SpringBootBeanUtil.getBean(messageModel.getHandleClazz());
receiver.handleMessage(JSON.parseObject(messageModel.getBodyContent(), messageModel.getBodyClass()), messageModel.getExtraParam());
}
}
配置注冊
@Configuration
@EnableJms
public class MessageCenter {
@Bean(name = "customQueueListener")
@ConditionalOnProperty(name = "system.messageChannel", havingValue = "activemq", matchIfMissing = false)
public JmsListenerContainerFactory<?> customQueueListener(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(false);
factory.setConnectionFactory(connectionFactory);
//重連間隔時間
factory.setRecoveryInterval(1000L);
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
//連接數(shù)
factory.setConcurrency("5-10");
//指定任務(wù)線程池
factory.setTaskExecutor(new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy()));
return factory;
}
}
使用示例
消息處理
@Service
public class RequestLogMessageReceiver implements IMessageReceiver{
@Autowired
private F_RequestLogService requestLogService;
@Override
public void handleMessage(Object bodyObject, HashMap extraParam) {
F_RequestLogDO requestLogDO = (F_RequestLogDO)bodyObject;
requestLogService.insert(requestLogDO);
}
}
發(fā)送消息
@AutoWired private MessageUtil messageUtil; messageUtil.sendMessage(requestLogDO,RequestLogMessageReceiver.class,null);
到此這篇關(guān)于詳解SpringBoot集成消息隊列的案例應(yīng)用的文章就介紹到這了,更多相關(guān)SpringBoot消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringSecurity實現(xiàn)圖形驗證碼功能的實例代碼
Spring Security 的前身是 Acegi Security ,是 Spring 項目組中用來提供安全認證服務(wù)的框架。這篇文章主要介紹了SpringSecurity實現(xiàn)圖形驗證碼功能,需要的朋友可以參考下2018-10-10
基于Java8并行流(parallelStream)的注意點
這篇文章主要介紹了Java8并行流(parallelStream)的注意點,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
Java8新特性之JavaFX 8_動力節(jié)點Java學院整理
這篇文章主要介紹了Java8新特性之JavaFX 8的相關(guān)知識,非常不錯,具有參考借鑒價值,需要的朋友參考下吧2017-06-06
springsecurity中http.permitall與web.ignoring的區(qū)別說明
這篇文章主要介紹了springsecurity中http.permitall與web.ignoring的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
Spring整合MyBatis(Maven+MySQL)圖文教程詳解
這篇文章主要介紹了Spring整合MyBatis(Maven+MySQL)圖文教程詳解的相關(guān)資料,需要的朋友可以參考下2016-07-07
mybatis-plus enum實現(xiàn)枚舉類型自動轉(zhuǎn)換
本文主要介紹了mybatis-plus enum實現(xiàn)枚舉類型自動轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2024-07-07

