SpringBoot整合Redis實現(xiàn)消息發(fā)布與訂閱的示例代碼
當(dāng)我們在多個集群應(yīng)用中使用到本地緩存時,在數(shù)據(jù)庫數(shù)據(jù)得到更新后,為保持各個副本當(dāng)前被修改的數(shù)據(jù)與數(shù)據(jù)庫數(shù)據(jù)保持同步,在數(shù)據(jù)被操作后向其他集群應(yīng)用發(fā)出被更新數(shù)據(jù)的通知,使其刪除;下次當(dāng)其他應(yīng)用請求該被更新的數(shù)據(jù)時,應(yīng)用會到數(shù)據(jù)庫去取,也就是最新的數(shù)據(jù),從而使得被更新數(shù)據(jù)與數(shù)據(jù)庫保持同步!
能實現(xiàn)發(fā)送與接收信息的中間介有很多,比如:RocketMQ、RabbitMQ、ActiveMQ、Kafka等,本次主要簡單介紹Redis的推送與訂閱功能并集成Spring Boot的實現(xiàn)。
1.添加SpringBoot 集成Redis maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.配置Redis配置 RedisConfig.java
@Configuration
public class RedisConfig {
@Value("${redis.server.nodes}")
private String redisServerNodes;
@Value("${redis.server.password}")
private String redisServerPassword;
//Redis集群配置,單機的redis注釋掉該方法,在application配置文件里面配置就可以了
@Bean
public RedisClusterConfiguration getRedisClusterConfiguration() {
RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
String[] serverArray = redisServerNodes.split(",");
Set<RedisNode> nodes = new HashSet<RedisNode>();
for (String ipPort : serverArray) {
String[] ipAndPort = ipPort.split(":");
nodes.add(new RedisNode(ipAndPort[0].trim(), Integer.parseInt(ipAndPort[1])));
}
redisClusterConfiguration.setClusterNodes(nodes);
RedisPassword pwd = RedisPassword.of(redisServerPassword);
redisClusterConfiguration.setPassword(pwd);
return redisClusterConfiguration;
}
//指定redisTemplate類型,如下為<String, Object>
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
// 使用JSON格式序列化對象,對緩存數(shù)據(jù)key和value進行轉(zhuǎn)換
Jackson2JsonRedisSerializer<Object> jacksonSeial = new Jackson2JsonRedisSerializer<>(Object.class);
// 解決查詢緩存轉(zhuǎn)換異常的問題
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jacksonSeial.setObjectMapper(objectMapper);
// 設(shè)置RedisTemplate模板API的序列化方式為JSON
template.setDefaultSerializer(jacksonSeial);
return template;
}
/**
* Redis消息監(jiān)聽器容器
* 這個容器加載了RedisConnectionFactory和消息監(jiān)聽器
* 可添加多個不同話題的redis監(jiān)聽器,需要將消息監(jiān)聽器和消息頻道綁定,
* 通過反射調(diào)用消息訂閱處理器的相關(guān)方法進行業(yè)務(wù)處理
*
* @param redisConnectionFactory 連接工廠
* @param listener Redis消息監(jiān)聽器
* @param MessageListenerAdapter Redis消息監(jiān)聽適配器
* @return RedisMessageListenerContainer 消息監(jiān)聽容器
*/
@Bean
@SuppressWarnings("all")
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,
RedisMessageListener listener,
MessageListenerAdapter adapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
// 所有的訂閱消息,都需要在這里進行注冊綁定
// new PatternTopic(TOPIC_NAME1) 表示發(fā)布信息的頻道
// 可以添加多個頻道以及配置不同的頻道
container.addMessageListener(listener, new PatternTopic(SystemConstants.TOPIC_NAME1));
container.addMessageListener(adapter, new PatternTopic(SystemConstants.TOPIC_NAME2));
/**
* 設(shè)置序列化對象
* 特別注意:1. 發(fā)布的時候和訂閱方都需要設(shè)置序列化
* 2. 設(shè)置序列化對象必須放在 {加入消息監(jiān)聽器} 這步后面,不然接收器接收不到消息
*/
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
container.setTopicSerializer(seria);
return container;
}
/**
* 這個地方是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調(diào)用“receiveMessage”
* 也有好幾個重載方法,這邊默認調(diào)用處理器的方法 叫OnMessage
*
* @param redisMessageReceiver 消息接收對象
* @return 消息監(jiān)聽適配器
*/
@Bean
public MessageListenerAdapter listenerAdapter(RedisMessageReceiver redisMessageReceiver) {
MessageListenerAdapter receiveMessage = new MessageListenerAdapter(printMessageReceiver, "onMessage");
Jackson2JsonRedisSerializer<Object> seria = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
receiveMessage.setSerializer(seria);
return receiveMessage;
}
}3.Redis的訂閱主要有兩種實現(xiàn)方式
方式一:編寫Redis監(jiān)聽類RedisMessageListener,實現(xiàn)Redis的監(jiān)聽接口MessageListener,并重寫onMessage方法
方式二:編寫Redis消息監(jiān)聽適配器類,并在RedisConfig.java中配置消息監(jiān)聽適配器bean
方式一 與 方式二 主要是實現(xiàn)訂閱Redis推送的消息后的具體操作,這兩種方式可以同時使用來訂閱多個頻道里的消息
//方式一:
@Slf4j
@Component
public class RedisMessageListener implements MessageListener {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private CacheManager cacheManager;
@Override
public void onMessage(Message message, byte[] pattern) {
// 接收的topic
log.info("接收消息頻道:" + new String(pattern));
//序列化對象(特別注意:發(fā)布的時候需要設(shè)置序列化;訂閱方也需要設(shè)置序列化)
MessageDto<?> messageDto = (MessageDto<?>) redisTemplate.getValueSerializer().deserialize(message.getBody());
//MessageDto<T>為自己編寫的一個消息對象類(如自定義有:String data,String title,T content 等屬性)
log.info("接收消息內(nèi)容:{}", messageDto);
//根據(jù)消息內(nèi)容做具體業(yè)務(wù)邏輯。。。。。。。。。
//。。。。。。。。。。。。。。。。。。。。。。
}
}//方式二
@Slf4j
@Component
public class RedisMessageReceiver {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
/**
* 方法命名規(guī)則必須為function(Object messageDto) / function(Object messageDto,String topic)
* @param messageDto 消息對象
* @param topic 消息頻道名稱
*/
public void onMessage(Object messageDto , String topic) {
// 接收消息頻道
log.info("接收消息頻道:" + topic);
//接收消息內(nèi)容
log.info("接收消息內(nèi)容:{}",messageDto);
}
}4.編寫Redis消息的推送工具類,在需要推送消息的地方使用來向Redis推送消息(如:操作數(shù)據(jù)的地方)
@Slf4j
@Component
public class RedisMessageUtils {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
/**
* 向通道發(fā)布消息
*/
public void sendMessage(String topic, Object message) {
if (!StringUtils.hasText(topic)) {
return;
}
try {
redisTemplate.convertAndSend(topic, message);
log.info("發(fā)送消息成功,topic:{},message:{}", topic, message);
} catch (Exception e) {
log.info("發(fā)送消息失敗,topic:{},message:{}", topic, message);
e.printStackTrace();
}
}
}5.使用
@RestController
@RequestMapping("/user")
public class UserController{
@Autowired
private UserService userService;
@PostMapping("/getUsers")
public List<User> queryUsers(@RequestBody UserDto userDto){
List<User> users=userService.queryUsers(userDto);
//發(fā)送測試消息
RedisMessageUtils.sendMessage(SystemConstants.TOPIC_NAME2, new MessageDto());
return users;
}
}成功示例:
2099-12-31 23:59:59.999 [http-nio-8888-exec-1] INFO com.xxx.yyy.util.RedisMessageUtils : 發(fā)送消息成功,topic:TOPIC2,message:MessageDto(data=null, title=null, content=null)
2099-12-31 23:59:59.999 [container-2] INFO com.xxx.yyy.zzz.RedisMessageReceiver : 接收消息頻道:TOPIC2
2099-12-31 23:59:59.999 [container-2] INFO com.xxx.yyy.zzz.RedisMessageReceiver : 接收消息內(nèi)容:MessageDto(data=null, title=null, content=null)
以上就是SpringBoot整合Redis實現(xiàn)消息發(fā)布與訂閱的示例代碼的詳細內(nèi)容,更多關(guān)于SpringBoot Redis消息發(fā)布 訂閱的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java模擬實現(xiàn)撲克牌洗牌和發(fā)牌的示例代碼
這篇文章主要為大家詳細介紹了如何利用Java模擬實現(xiàn)撲克牌洗牌和發(fā)牌的功能,文中的示例代碼講解詳細,感興趣的小伙伴可以嘗試一下2022-09-09
Netty分布式NioEventLoop優(yōu)化selector源碼解析
這篇文章主要介紹了Netty分布式NioEventLoop優(yōu)化selector源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03
淺析Java設(shè)計模式編程中的單例模式和簡單工廠模式
這篇文章主要介紹了淺析Java設(shè)計模式編程中的單例模式和簡單工廠模式,使用設(shè)計模式編寫代碼有利于團隊協(xié)作時程序的維護,需要的朋友可以參考下2016-01-01
SpringBoot?調(diào)用外部接口的三種實現(xiàn)方法
Spring Boot調(diào)用外部接口的方式有多種,常見的有以下三種方式:RestTemplate、Feign 和 WebClient,本文就詳細介紹一下,感興趣的可以了解一下2023-08-08
Java基于logback?MessageConverter實現(xiàn)日志脫敏方案分析
本文介紹了一種日志脫敏方案,即基于logbackMessageConverter和正則匹配的方法,該方法的優(yōu)點是侵入性低,工作量少,只需修改xml配置文件,適用于老項目,感興趣的朋友跟隨小編一起看看吧2024-10-10

