JAVA中通過Redis實現(xiàn)延時任務demo實例
先說結論,有兩種方式可以實現(xiàn):
- 通過Redis監(jiān)聽過期key實現(xiàn)。
- 使用Redisson 內置的延時隊列實現(xiàn)。
1.監(jiān)聽key過期事件實現(xiàn)
1.1 實現(xiàn)原理
Redis在2.0版本時引入了發(fā)布訂閱(pub/sub)功能,在發(fā)布訂閱中有一個channel(頻道),與消息隊列中的topic(主題)類似??梢酝ㄟ^redis的發(fā)布訂閱者模式實現(xiàn)延時任務功能。
pub/sub即發(fā)布者publisher和訂閱者subscriber,也可以叫生產者和消費者。發(fā)布者通過PUBLISH投遞消息給指定的channel,訂閱者通過SUBSCRIBER訂閱自己關心的channel,訂閱者可以訂閱一個或者多個不同的channel。
在發(fā)布訂閱模式下生產者需要將消息發(fā)送到指定的channel中,消費者需要訂閱對應channel拿到想要的消息。Redis中有很多默認的channel,這些channel是由Redis本身向他們發(fā)送消息的,這不是我們自己編寫的代碼,其中keyevent@:expired 是其中的一個默認channel,db表示的是redis的哪一個數(shù)據(jù)庫。這個channel負責監(jiān)聽過期的key,也就是說如果有一個key過期了,那么redis會將這個key過期的信息發(fā)送到這個頻道,我們只需要監(jiān)聽這個頻道就可以拿到對應的過期key信息,這樣我們就能實現(xiàn)一個延遲任務功能了。
舉個列子:比如我現(xiàn)在需要實現(xiàn)一個郵件提醒功能,需要在任務發(fā)布后的前24小時通過郵件通知未完成的用戶。我們可以在任務發(fā)布時設置一個key,這個key的過期時間是當前時間到任務前24小時,監(jiān)聽對應的key過期channel,當key過期后拿到對應的key,去執(zhí)行你自定義的業(yè)務邏輯即可,當然這個key需要你進行設計,比如可以為任務id等等。
1.2 實現(xiàn)Demo
現(xiàn)在有一個會議室預約的系統(tǒng),用戶可以通過該系統(tǒng)填寫預約理由進行預約,該預約請求需要管理員完成審核后才能生效。有一個需求,如果該預約沒有被審批,那么需要自動將該預約申請置為超期未處理。這里我們就可以使用延時任務實現(xiàn)這個功能。
第一步我們需要在房間進行預約操作的時候,同時去緩存一個key,這個key就緩存成房間預約申請的id,這樣當key過期時,我們就能拿到對應的申請信息,從而去通知對應的審核人。
房間預約操作時設置對應緩存key:
private void setRoomApplyNotifyCache(RoomReservation roomReservation, String userId) {
// 記錄當前時間->房間預約起始時間,redis緩存,用于判斷是否管理員超期未處理,自動更改狀態(tài),通知用戶房間預約超期未處理,防止占用時間段,用戶可以重新預約
long cacheTimeSecond = DateUtil.between(new Date(), new Date(roomReservation.getStartTime()), DateUnit.SECOND);
String roomOccupancyApplyKey = "record_reserve_key:" + roomReservation.getId();
redisCacheUtil.setCacheObject(roomOccupancyApplyKey, userId, cacheTimeSecond, TimeUnit.SECONDS);
// 前一個小時提醒負責人審核。 預約間隔最少是30分鐘
long cacheNotifyChargerSecond = cacheTimeSecond - (60 * 60);
// 當前時間距離預約起始時間小于一個小時
if (cacheTimeSecond <= 3600L && cacheTimeSecond > 1800L) {
// 不足一個小時,但是大于半個小時
cacheNotifyChargerSecond = cacheTimeSecond - (30 * 60);
} else if (cacheTimeSecond < 1800L) {
// 不設置通知審核人
return;
}
// 緩存
String notifyChargerKey = RedisCacheKey.ROOM_APPLY_TIMEOUT_NOTIFY_KEY.concatKey(roomReservation.getId());
redisCacheUtil.setCacheObject(notifyChargerKey, userId, cacheNotifyChargerSecond, TimeUnit.SECONDS);
}監(jiān)聽key過期channel并作出處理
@Component
public class RedisExpiredKeyListenerComponent extends KeyExpirationEventMessageListener {
// 通過構造函數(shù)注入 RedisMessageListenerContainer 給 KeyExpirationEventMessageListener
public RedisExpiredKeyListenerComponent(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
protected void doRegister(RedisMessageListenerContainer listenerContainer) {
listenerContainer.addMessageListener(this, new PatternTopic("__keyevent@0__:expired"));
}
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
if (expiredKey.startsWith("record_reserve_key:")) {
String reserveId = expiredKey.substring("record_reserve_key:".length());
// 根據(jù)id查詢房間預約信息,發(fā)送給審核人通知郵件。
// ....
}
}
}這樣就非常簡單的實現(xiàn)了延時任務的功能。
1.3 有什么缺陷?
- 時效性差
為什么這么說?因為過期事件消息是在Redis刪除key時才發(fā)布的,而不是key過期時就發(fā)布了。
Redis中常用的過期策略有:
- 惰性刪除
只會在取出key時判斷key是否已經過期,這樣對cpu比較友好,因為不用頻繁的去掃描所有的key。 - 定期刪除
每隔一段時間抽取一批key執(zhí)行過期key刪除操作。并且,Redis 底層會通過限制刪除操作執(zhí)行的時長和頻率來減少刪除操作對 CPU 時間的影響。
定期刪除對內存更加友好,惰性刪除對 CPU 更加友好。兩者各有千秋,所以 Redis 采用的是 定期刪除+惰性/懶漢式刪除 。
因此,就會存在我設置了 key 的過期時間,但到了指定時間 key 還未被刪除,進而沒有發(fā)布過期事件的情況。
- 丟消息
Redis 的 pub/sub 模式中的消息并不支持持久化,這與消息隊列不同。在 Redis 的 pub/sub 模式中,發(fā)布者將消息發(fā)送給指定的頻道,訂閱者監(jiān)聽相應的頻道以接收消息。當沒有訂閱者時,消息會被直接丟棄,在 Redis 中不會存儲該消息。 - 多服務實例的情況下存在消息重復問題
Redis 的 pub/sub 模式目前只有廣播模式,這意味著當生產者向特定頻道發(fā)布一條消息時,所有訂閱相關頻道的消費者都能夠收到該消息。
這個時候,我們需要注意多個服務實例重復處理消息的問題,這會增加代碼開發(fā)量和維護難度。
2. 通過Redission實現(xiàn)
1、引入 Redission 依賴:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.2</version>
</dependency>2、創(chuàng)建 Redisson 配置類:
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
return Redisson.create(config);
}
}3、封裝了一個延遲隊列類 RedissonDelayQueue
@Component
public class RedissonDelayQueue {
private static final Logger log = LoggerFactory.getLogger(RedissonDelayQueue.class);
@Autowired
private RedissonClient redissonClient;
// 延遲隊列
private RDelayedQueue<String> delayQueue;
// 阻塞隊列
private RBlockingQueue<String> blockingQueue;
private ExecutorService executorService;
public RedissonDelayQueue() {
this.executorService = new ThreadPoolExecutor(
5,
10,
0L, TimeUnit.MILLISECONDS,
new java.util.concurrent.LinkedBlockingQueue<>(),
new CustomThreadFactory()
);
}
@PostConstruct
public void init() {
blockingQueue = redissonClient.getBlockingQueue("myQueue");
delayQueue = redissonClient.getDelayedQueue(blockingQueue);
startConsumer();
}
private void startConsumer() {
executorService.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
// 從阻塞隊列中獲取任務
String task = blockingQueue.take();
log.info("Received task: {}", task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Error processing task", e);
}
}
});
}
public void addTask(String task, long delay) {
log.info("Add task: {} with delay: {} seconds", task, delay);
// 將任務添加到延遲隊列
delayQueue.offer(task, delay, TimeUnit.SECONDS);
}
private static class CustomThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "DelayQueue-Consumer");
thread.setDaemon(true);
return thread;
}
}
}RedissonDelayQueue 中的兩個核心方法:
startConsumer():啟動一個消費者線程,從阻塞隊列blockingQueue中獲取任務并處理。addTask(String task, long delay):將一個任務添加到延遲隊列中,并指定延遲時間。
4、編一個 Controller 測試一下:
@RestController
public class TaskController {
@Autowired
private RedissonDelayQueue redissonDelayQueue;
@PostMapping("/addTask")
public void addTask(@RequestParam String task, @RequestParam long delay) {
redissonDelayQueue.addTask(task, delay);
}
}GET http://localhost:8080/addTask?task=test-task&delay=15
控制臺輸出:

可以看到任務的確是延遲了15s后開始執(zhí)行的。
3. 為什么用Redisson更好?
Redisson 是一個開源的 Java 語言 Redis 客戶端,提供了很多開箱即用的功能,比如多種分布式鎖的實現(xiàn)、延時隊列。
我們可以借助 Redisson 內置的延時隊列 RDelayedQueue 來實現(xiàn)延時任務功能。
Redisson 的延遲隊列 RDelayedQueue 是基于 Redis 的 SortedSet 來實現(xiàn)的。SortedSet 是一個有序集合,其中的每個元素都可以設置一個分數(shù),代表該元素的權重。Redisson 利用這一特性,將需要延遲執(zhí)行的任務插入到 SortedSet 中,并給它們設置相應的過期時間作為分數(shù)。
Redisson 在客戶端(即應用程序進程)中啟動一個定時任務,到時間后使用 zrangebyscore 命令掃描 SortedSet 中過期的元素(即分數(shù)小于或等于當前時間的元素),然后將這些過期元素從 SortedSet 中移除,并將它們加入到就緒消息列表( List 結構)中。
當任務被移到實際的就緒消息列表中時,Redisson 通常還會通過發(fā)布/訂閱機制(Redis 的 Pub/Sub 模型)來通知消費者有新任務到達。
就緒消息列表是一個阻塞隊列,消費者可以使用阻塞操作(如 BLPOP key 0,0 表示無限等待,直到有消息進入隊列)監(jiān)聽。由于 Redis 的 Pub/Sub 機制是事件驅動的,它避免了輪詢開銷,只有在有新消息時才會觸發(fā)處理邏輯。
注意:Redisson 的定時任務調度器并不是以固定的時間間隔頻繁調用 zrangebyscore 命令進行掃描,而是根據(jù) SortedSet 中最近的到期時間來動態(tài)調整下一次檢查的時間點。
當然對于幾天或者幾周后才會執(zhí)行的任務,可以結合mysql進行優(yōu)化??梢酝ㄟ^定時任務(例如 XXL-JOB、Spring Task)定期(如每 15 分鐘或 30 分鐘)掃描 MySQL 中即將到期的任務(例如在未來 2 小時內到期的任務)并推送到 Redis 中。
4. 為什么不直接用消息隊列呢?
在我的項目中(https://github.com/MuShanYu/apply-room-record),由于沒有其他場景需要使用消息隊列,因此不想為了單一的延時任務場景引入消息隊列。引入 MQ 會增加系統(tǒng)的復雜性,需要維護額外的組件和配置,還會增加成本,這是不太可取的。
如果項目將來確實有需要引入 MQ 的場景且 Redis 延時任務確實不再滿足項目需求,我會考慮將延時任務的實現(xiàn)平滑遷移到 MQ 上。
個人項目中使用的是簡單的key過期監(jiān)聽策略,正在優(yōu)化。
希望這篇文章能夠對你有所幫助。
總結
Redis在2.0版本時引入了發(fā)布訂閱(pub/sub)功能,在發(fā)布訂閱中有一個channel(頻道),與消息隊列中的topic(主題)類似,可以通過redis的發(fā)布訂閱者模式實現(xiàn)延時任務功能,實例中會議室預約系統(tǒng),用戶預約管理員審核后生效,如未審批,需要自動變超期未處理,使用延時任務。
到此這篇關于JAVA中通過Redis實現(xiàn)延時任務demo實例的文章就介紹到這了,更多相關JAVA中Redis實現(xiàn)延時內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Springboot整合hibernate validator 全局異常處理步驟詳解
本文分步驟給大家介紹Springboot整合hibernate validator 全局異常處理,補呢文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01
Java編程之jdk1.4,jdk1.5和jdk1.6的區(qū)別分析(經典)
這篇文章主要介紹了Java編程之jdk1.4,jdk1.5和jdk1.6的區(qū)別分析,結合實例形式較為詳細的分析說明了jdk1.4,jdk1.5和jdk1.6版本的使用區(qū)別,需要的朋友可以參考下2015-12-12
IntelliJ IDEA像Eclipse一樣打開多個項目的圖文教程
這篇文章主要介紹了IntelliJ IDEA像Eclipse一樣打開多個項目的方法圖文教程講解,需要的朋友可以參考下2018-03-03
spring boot定時任務接收郵件并且存儲附件的方法講解
今天小編就為大家分享一篇關于spring boot定時任務接收郵件并且存儲附件的方法講解,小編覺得內容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-03-03

