Redis監(jiān)聽過期的key實(shí)現(xiàn)流程詳解
一、簡(jiǎn)介
本文今天主要是講Redis中對(duì)過期key的監(jiān)聽,可能很多小伙伴不會(huì),或者使用會(huì)出現(xiàn)一些不可思議的問題,比如在系統(tǒng)中設(shè)置了一個(gè)緩存,希望在緩存失效后去做什么操作,但是實(shí)際中可能又出現(xiàn)了操作重復(fù)的問題。所以今天來(lái)討論下怎么正確使用。我們來(lái)個(gè)最簡(jiǎn)單的集群架構(gòu),如下圖:

我們上面圖中看到是服務(wù)A和服務(wù)B就是同一個(gè)服務(wù)的不同實(shí)例。
二、maven依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.alian</groupId>
<artifactId>expiration</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>expiration</name>
<description>redis-key-expiration-listener</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.package.directory>target</project.package.directory>
<java.version>1.8</java.version>
<!--com.fasterxml.jackson 版本-->
<jackson.version>2.9.10</jackson.version>
<!--阿里巴巴fastjson 版本-->
<fastjson.version>1.2.68</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--redis依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>${parent.version}</version>
</dependency>
<!--用于序列化-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!--java 8時(shí)間序列化-->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
三、編碼實(shí)現(xiàn)
3.1、application.properties
# 端口
server.port=8090
# 上下文路徑
server.servlet.context-path=/expiration# Redis數(shù)據(jù)庫(kù)索引(默認(rèn)為0)
spring.redis.database=0
# Redis服務(wù)器地址
spring.redis.host=192.168.0.193
#spring.redis.host=127.0.0.1
# Redis服務(wù)器連接端口
spring.redis.port=6379
# Redis服務(wù)器連接密碼(默認(rèn)為空)
spring.redis.password=
# 連接池最大連接數(shù)(使用負(fù)值表示沒有限制)
spring.redis.jedis.pool.max-active=20
# 連接池中的最小空閑連接
spring.redis.jedis.pool.min-idle=10
# 連接池中的最大空閑連接
spring.redis.jedis.pool.max-idle=10
# 連接池最大阻塞等待時(shí)間(使用負(fù)值表示沒有限制)
spring.redis.jedis.pool.max-wait=20000
# 讀時(shí)間(毫秒)
spring.redis.timeout=10000
# 連接超時(shí)時(shí)間(毫秒)
spring.redis.connect-timeout=10000
3.2、Redis配置類
RedisConfig
package com.alian.expiration.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
@Configuration
public class RedisConfig {
/**
* redis配置
*
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
// 實(shí)例化redisTemplate
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
//設(shè)置連接工廠
redisTemplate.setConnectionFactory(redisConnectionFactory);
// key采用String的序列化
redisTemplate.setKeySerializer(keySerializer());
// value采用jackson序列化
redisTemplate.setValueSerializer(valueSerializer());
// Hash key采用String的序列化
redisTemplate.setHashKeySerializer(keySerializer());
// Hash value采用jackson序列化
redisTemplate.setHashValueSerializer(valueSerializer());
// 支持事務(wù)
// redisTemplate.setEnableTransactionSupport(true);
//執(zhí)行函數(shù),初始化RedisTemplate
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* key類型采用String序列化
*
* @return
*/
private RedisSerializer<String> keySerializer() {
return new StringRedisSerializer();
}
/**
* value采用JSON序列化
*
* @return
*/
private RedisSerializer<Object> valueSerializer() {
//設(shè)置jackson序列化
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
//設(shè)置序列化對(duì)象
jackson2JsonRedisSerializer.setObjectMapper(getMapper());
return jackson2JsonRedisSerializer;
}
/**
* 使用com.fasterxml.jackson.databind.ObjectMapper
* 對(duì)數(shù)據(jù)進(jìn)行處理包括java8里的時(shí)間
*
* @return
*/
private ObjectMapper getMapper() {
ObjectMapper mapper = new ObjectMapper();
//設(shè)置可見性
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//默認(rèn)鍵入對(duì)象
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
//設(shè)置Java 8 時(shí)間序列化
JavaTimeModule timeModule = new JavaTimeModule();
timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
//禁用把時(shí)間轉(zhuǎn)為時(shí)間戳
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
mapper.registerModule(timeModule);
return mapper;
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}和我們之前整合redis差不多,只不過在最后增加了一個(gè)redis消息監(jiān)聽監(jiān)聽容器RedisMessageListenerContainer
3.3、監(jiān)聽器
RedisKeyExpirationListener
package com.alian.expiration.listener;
import com.alian.expiration.service.RedisExpirationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
@Autowired
private RedisExpirationService redisExpirationService;
// 把我們上面一步配置的bean注入進(jìn)去
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 針對(duì)redis數(shù)據(jù)失效事件,進(jìn)行數(shù)據(jù)處理
*
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 用戶做自己的業(yè)務(wù)處理即可,注意message.toString()可以獲取失效的key
String expiredKey = message.toString();
log.info("onMessage --> redis 過期的key是:{}", expiredKey);
try {
// 對(duì)過期key進(jìn)行處理
redisExpirationService.processingExpiredKey(expiredKey);
log.info("過期key處理完成:{}", expiredKey);
} catch (Exception e) {
e.printStackTrace();
log.error("處理redis 過期的key異常:{}", expiredKey, e);
}
}
}實(shí)現(xiàn)的步驟如下:
- 繼承KeyExpirationEventMessageListener
- 把redis消息監(jiān)聽監(jiān)聽容器RedisMessageListenerContainer 注入到密鑰空間事件消息偵 聽器中
- 重寫onMessage方法
- 通過Message 的 toString() 方法就可以獲取到過期的key
- 對(duì)key中關(guān)鍵信息進(jìn)行業(yè)務(wù)處理,比如 id
3.4、服務(wù)類
RedisExpirationService
package com.alian.expiration.service;
import com.alian.expiration.util.SignUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class RedisExpirationService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void processingExpiredKey(String expiredKey) {
// 如果是優(yōu)惠券的key(一定要規(guī)范命名)
if (expiredKey.startsWith("com.mall.coupon.id")) {
// 臨時(shí)key,此key可以在業(yè)務(wù)處理完,然后延遲一定時(shí)間刪除,或者不處理
String tempKey = SignUtils.md5(expiredKey, "UTF-8");
// 臨時(shí)key不存在才設(shè)置值,key超時(shí)時(shí)間為10秒(此處相當(dāng)于分布式鎖的應(yīng)用)
Boolean exist = redisTemplate.opsForValue().setIfAbsent(tempKey, "1", 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(exist)) {
log.info("Business Handing...");
// 比如截取里面的id,然后關(guān)聯(lián)數(shù)據(jù)庫(kù)進(jìn)行處理
} else {
log.info("Other service is handing...");
}
} else {
log.info("Expired keys without processing");
}
}
}
基本流程如下:
- 判斷是否是需要處理的key,一般這種key通過命名規(guī)范加以處理
- 以當(dāng)前key生成一個(gè)新的key作為分布式key
- 如果redis中不存在這個(gè)新的key,則為新的key設(shè)置一個(gè)值,達(dá)到分布式服務(wù)處理(核心)
- 設(shè)置成功的,進(jìn)行業(yè)務(wù)處理;設(shè)置失敗了,說明其他服務(wù)正在處理這個(gè)key
- 根據(jù) key 的關(guān)鍵信息(比如截取id),進(jìn)行業(yè)務(wù)處理
3.5、工具類
SignUtils
package com.alian.expiration.util;
import java.security.MessageDigest;
public class SignUtils {
public static final String md5(String s, String charset) {
char[] hexDigits = new char[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
try {
byte[] btInput = s.getBytes(charset);
MessageDigest mdInst = MessageDigest.getInstance("MD5");
mdInst.update(btInput);
byte[] md = mdInst.digest();
int j = md.length;
char[] str = new char[j * 2];
int k = 0;
for (byte byte0 : md) {
str[k++] = hexDigits[byte0 >>> 4 & 15];
str[k++] = hexDigits[byte0 & 15];
}
return new String(str);
} catch (Exception var11) {
return "";
}
}
}
四、測(cè)試
4.1、測(cè)試類
簡(jiǎn)單模擬下發(fā)送一個(gè)優(yōu)惠券數(shù)據(jù)到redis,然后設(shè)置超時(shí)時(shí)間
package com.alian.expiration;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RedisKeyExpirationTest {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Test
public void keyExpiration() {
// 優(yōu)惠券信息
String id = "2023021685264735";
Map<String, String> map = new HashMap<>();
map.put("id", id);
map.put("amount", "1000");
map.put("type", "1001");
map.put("describe", "滿減紅包");
// 緩存到redis
redisTemplate.opsForHash().putAll("com.mall.coupon.id." + id, map);
// 設(shè)置過期時(shí)間
redisTemplate.expire("com.mall.coupon.id." + id, 10, TimeUnit.SECONDS);
}
}
4.2、單實(shí)例
單實(shí)例就是服務(wù)只部署了一份,我們啟動(dòng)一份,端口是8090,然后通過上面的測(cè)試類,發(fā)送一個(gè)消息,結(jié)果如下:
10:23:39 701 INFO [container-2]:onMessage --> redis 過期的key是:com.mall.coupon.id.2023021685264735
10:23:39 988 INFO [container-2]:Business Handing...
10:23:39 989 INFO [container-2]:過期key處理完成:com.mall.coupon.id.2023021685264735
10:23:50 005 INFO [container-3]:onMessage --> redis 過期的key是:450FCC35415BADC16805962CA5BC7E12
10:23:50 005 INFO [container-3]:Expired keys without processing
10:23:50 005 INFO [container-3]:過期key處理完成:450FCC35415BADC16805962CA5BC7E12
4.3、多實(shí)例
多實(shí)例就是服務(wù)部署了多份,比如我們啟動(dòng)兩份,端口分別為8090和8091,然后通過上面的測(cè)試類,發(fā)送一個(gè)消息,8090端口的服務(wù)結(jié)果如下(Business Handing…):
11:39:06 691 INFO [container-2]:onMessage --> redis 過期的key是:com.mall.coupon.id.2023021685264735
11:39:06 707 INFO [container-2]:Business Handing...
11:39:06 707 INFO [container-2]:過期key處理完成:com.mall.coupon.id.2023021685264735
11:39:16 796 INFO [container-3]:onMessage --> redis 過期的key是:450FCC35415BADC16805962CA5BC7E12
11:39:16 796 INFO [container-3]:Expired keys without processing
11:39:16 796 INFO [container-3]:過期key處理完成:450FCC35415BADC16805962CA5BC7E12
8091端口的服務(wù)結(jié)果如下(Other service is handing…):
11:39:06 691 INFO [container-2]:onMessage --> redis 過期的key是:com.mall.coupon.id.2023021685264735
11:39:06 707 INFO [container-2]:Other service is handing...
11:39:06 707 INFO [container-2]:過期key處理完成:com.mall.coupon.id.2023021685264735
11:39:16 796 INFO [container-3]:onMessage --> redis 過期的key是:450FCC35415BADC16805962CA5BC7E12
11:39:16 796 INFO [container-3]:Expired keys without processing
11:39:16 796 INFO [container-3]:過期key處理完成:450FCC35415BADC16805962CA5BC7E12
結(jié)果分析:
- 多實(shí)例的情況下,每個(gè)實(shí)例都會(huì)收到過期key通知
- 通過redis分布式鎖,實(shí)現(xiàn)只有一個(gè)實(shí)例會(huì)進(jìn)行業(yè)務(wù)處理,防止重復(fù)
- 使用分布式鎖會(huì)有一個(gè)新的key過期,并且收到該key的通知,你可以業(yè)務(wù)執(zhí)行完延遲一定時(shí)間(避免重復(fù)執(zhí)行),再刪除,也可以不處理(因?yàn)楸揪筒皇且幚順I(yè)務(wù)的key)
結(jié)語(yǔ)
多實(shí)例的情況下,每個(gè)實(shí)例都會(huì)收到過期key通知,可以通過分布式鎖的方式去處理業(yè)務(wù),避免業(yè)務(wù)重復(fù)執(zhí)行
到此這篇關(guān)于Redis監(jiān)聽過期的key實(shí)現(xiàn)流程詳解的文章就介紹到這了,更多相關(guān)Redis監(jiān)聽key內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java畢業(yè)設(shè)計(jì)實(shí)戰(zhàn)之二手書商城系統(tǒng)的實(shí)現(xiàn)
這是一個(gè)使用了java+JSP+Springboot+maven+mysql+ThymeLeaf+FTP開發(fā)的二手書商城系統(tǒng),是一個(gè)畢業(yè)設(shè)計(jì)的實(shí)戰(zhàn)練習(xí),具有在線書城該有的所有功能,感興趣的朋友快來(lái)看看吧2022-01-01
mybatis-plus如何配置自定義數(shù)據(jù)類型TypeHandle
這篇文章主要介紹了mybatis-plus如何配置自定義數(shù)據(jù)類型TypeHandle,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01
MyBatis中的循環(huán)插入insert foreach問題
這篇文章主要介紹了MyBatis中的循環(huán)插入insert foreach問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11
java中實(shí)現(xiàn)一個(gè)定時(shí)任務(wù)的方式
本文介紹了三種在Java中實(shí)現(xiàn)定時(shí)任務(wù)的方法,并推薦使用Spring Boot注解方式,介紹了如何使用`@Scheduled`注解結(jié)合Cron表達(dá)式來(lái)設(shè)置定時(shí)任務(wù),并提供了一個(gè)示例配置文件2025-03-03
Java使用Fastjson進(jìn)行JSON數(shù)據(jù)操作教程詳解
Fastjson?是一個(gè)?Java?庫(kù),可以用來(lái)將?Java?對(duì)象轉(zhuǎn)換為它們的?JSON?表示,本文主要為大家詳細(xì)介紹了Java如何使用Fastjson進(jìn)行JSON數(shù)據(jù)操作,需要的可以參考下2023-12-12
多個(gè)SpringBoot項(xiàng)目采用redis實(shí)現(xiàn)Session共享功能
這篇文章主要介紹了多個(gè)SpringBoot項(xiàng)目采用redis實(shí)現(xiàn)Session共享,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09

