一文帶你深入了解Java中延時任務的實現(xiàn)
概述
延時任務相信大家都不陌生,在現(xiàn)實的業(yè)務中應用場景可以說是比比皆是。例如訂單下單15分鐘未支付直接取消,外賣超時自動賠付等等。這些情況下,我們該怎么設計我們的服務的實現(xiàn)呢?
笨一點的方法自然是定時任務去數(shù)據(jù)庫進行輪詢。但是當業(yè)務量較大,事件處理比較費時的時候,我們的系統(tǒng)和數(shù)據(jù)庫往往會面臨巨大的壓力,如果采用這種方式或許會導致數(shù)據(jù)庫和系統(tǒng)的崩潰。那么有什么好辦法嗎?今天我來為大家介紹幾種實現(xiàn)延時任務的辦法。
JAVA DelayQueue
你沒看錯,java內部有內置延時隊列,位于java concurrent包內。
DelayQueue是一個jdk中自帶的延時隊列實現(xiàn),他的實現(xiàn)依賴于可重入鎖ReentrantLock以及條件鎖Condition和優(yōu)先隊列PriorityQueue。而且本質上他也是一個阻塞隊列。那么他是如何實現(xiàn)延時效果的呢。
DelayQueue的實現(xiàn)原理
首先DelayQueue隊列中的元素必須繼承一個接口叫做Delayed,我們找到這個類
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}發(fā)現(xiàn)這個類內部定義了一個返回值為long的方法getDelay,這個方法用來定義隊列中的元素的過期時間,所有需要放在隊列中的元素,必須實現(xiàn)這個方法。
然后我們來看看延遲隊列的隊列是如何操作的,我們就拿最典型的offer和take來看:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}offer操作平平無奇,甚至直接調用到了優(yōu)先隊列的offer來將隊列根據(jù)延時進行排序,只不過加了個鎖,做了些數(shù)據(jù)的調整,沒有什么深入的地方,但是take的實現(xiàn)看上去就很復雜了。(注意,Dalayed繼承了Comparable方法,所以是可以直接用優(yōu)先隊列來排序的,只要你自己實現(xiàn)了compareTo方法)我嘗試加了些注釋讓各位看得更明白些:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 自選操作
for (;;) {
// 獲取隊列第一個元素,如果隊列為空
// 阻塞住直到有新元素加入隊列,offer等方法調用signal喚醒線程
E first = q.peek();
if (first == null)
available.await();
else {
// 如果隊列中有元素
long delay = first.getDelay(NANOSECONDS);
// 判斷延時時間,如果到時間了,直接取出數(shù)據(jù)并return
if (delay <= 0)
return q.poll();
first = null;
// 如果leader為空則阻塞
if (leader != null)
available.await();
else {
// 獲取當前線程
Thread thisThread = Thread.currentThread();
// 設置leader為當前線程
leader = thisThread;
try {
// 阻塞延時時間
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}我們可以看到take的實現(xiàn)依靠了無限自旋,直到第一個隊列元素過了超時時間后才會返回,否則等待他的只有被阻塞。
DelayQueue實現(xiàn)延時隊列的優(yōu)缺點
看了源碼后,我們應該對DelayQueue的實現(xiàn)有了一個大致的了解,也對他的優(yōu)缺點有了一定的理解。他的優(yōu)點很明顯:
- java原生支持,不需要引入第三方工具
- 線程安全,即插即用使用方便
但是他的缺點也是很明顯的:
- 不支持分布式,并且數(shù)據(jù)放在內存中,沒有持久化的支持,服務宕機會丟失數(shù)據(jù)
- 插入時使用的是優(yōu)先隊列的排序,時間復雜度較高,并且對于隊列中的任務不能很好的管理
所以有沒有更好的延時隊列的實現(xiàn)呢,我們繼續(xù)看下去~
時間輪算法
時間輪算法是一個被設計出來處理延時任務的算法,現(xiàn)實中的應用可以在kafka以及netty等項目中找到類似的實現(xiàn)。
時間輪的具體實現(xiàn)
所謂時間輪,顧名思義,他是一個類似于時鐘的結構,即他的主結構是一個環(huán)形數(shù)組,如圖:

環(huán)形數(shù)組中存放的是一個一個的鏈表,鏈表中存放著需要執(zhí)行的任務,我們設定好數(shù)組中執(zhí)行的間隔,假設我們的環(huán)形數(shù)組的長度是60,每個數(shù)組的執(zhí)行間隔為1s,那么我們會在每過1s就會執(zhí)行數(shù)組下一個元素中的鏈表中的元素。如果只是這樣,那么我們將無法處理60秒之外的延時任務,這顯然不合適,所以我們會在每個任務中加上一個參數(shù)圈數(shù),來表明任務會在幾圈后執(zhí)行。假如我們有一個任務是在150s后執(zhí)行,那么他應該在30s的位置,同時圈數(shù)應該為2。我們每次執(zhí)行一個鏈表中的任務的時候會把當圈需要執(zhí)行的任務取出執(zhí)行,然后把他從鏈表中刪除,如果任務不是當圈執(zhí)行,則修改他的圈數(shù),將圈數(shù)減1,于是一個簡單的時間輪出爐了。
那么這樣的時間輪有什么優(yōu)缺點呢?
先來說優(yōu)點吧:
- 相比
DelayQueue來說,時間輪的插入更加的高效,時間復雜度為O(1) - 實現(xiàn)簡單清晰,任務調度更加方便合理
當然他的缺點也不少:
- 他和
DelayQueue一樣不支持分布式,并且數(shù)據(jù)放在內存中,沒有持久化的支持,服務宕機會丟失數(shù)據(jù) - 數(shù)組間的間隔設置會影響任務的精度
- 由于不同圈數(shù)的任務會在同一個鏈表中,執(zhí)行到每個數(shù)組元素時需要遍歷所有的鏈表數(shù)據(jù),效率會很低
進階優(yōu)化版時間輪算法
剛才提到了一些時間輪算法的缺點,那么是不是有一些方法來進行下優(yōu)化?這里我來介紹一下時間輪的優(yōu)化版本。
之前我們提到不同圈數(shù)的任務會在同一個鏈表中被重復遍歷影響效率,這種情況下我們可以進行如下優(yōu)化:將時間輪進行分層

我們可以看到圖中,我們采用了多層級的設計,上圖中分了三層,每層都是60格,第一個輪盤中的間隔為1小時,我們的數(shù)據(jù)每一次都是插入到這個輪盤中,每當這個輪盤經過一個小時后來到下一個刻度,就會取出其中的所有元素,按照延遲時間放入到第二個象征著分鐘的輪盤中,以此類推。
這樣的實現(xiàn)好處可以說是顯而易見的:
- 首先避免了當時間跨度較大時空間的浪費
- 每一次到達刻度的時候我們不用再像以前那樣遍歷鏈表取出需要的數(shù)據(jù),而是可以一次性全部拿出來,大大節(jié)約了操作的時間
時間輪算法的應用
時間輪算法可能在之前大家沒有聽說過,但是他在各個地方都有著不小的作用。linux的定時器的實現(xiàn)中就有時間輪的身影,同樣如果你是一個喜好看源碼的讀者,你也可能會在kafka以及netty中找到他的實現(xiàn)。
kafka
kafka中應用了時間輪算法,他的實現(xiàn)和之前提到的進階版時間輪沒有太大的區(qū)別,只有在一點上:kafka內部實現(xiàn)的時間輪應用到了DelayQueue。
@nonthreadsafe
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
private[this] val interval = tickMs * wheelSize
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
private[this] var currentTime = startMs - (startMs % tickMs)
@volatile private[this] var overflowWheel: TimingWheel = null
private[this] def addOverflowWheel(): Unit = {
synchronized {
if (overflowWheel == null) {
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
startMs = currentTime,
taskCounter = taskCounter,
queue
)
}
}
}
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {
false
} else if (expiration < currentTime + tickMs) {
false
} else if (expiration < currentTime + interval) {
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
if (bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket)
}
true
} else {
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}
def advanceClock(timeMs: Long): Unit = {
if (timeMs >= currentTime + tickMs) {
currentTime = timeMs - (timeMs % tickMs)
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
}上面是kafka內部的實現(xiàn)(使用的語言是scala),我們可以看到實現(xiàn)非常的簡潔,并且使用到了DelayQueue。我們剛才已經討論過了DelayQueue的優(yōu)缺點,查看源碼后我們已經可以有一個大致的結論了:DelayQueue在kafka的時間輪中的作用是負責推進任務的,為的就是防止在時間輪中由于任務比較稀疏而造成的"空推進"。DelayQueue的觸發(fā)機制可以很好的避免這一點,同時由于DelayQueue的插入效率較低,所以僅用于底層的推進,任務的插入由時間輪來操作,兩者配置,可以實現(xiàn)效率和資源的平衡。
netty
netty的內部也有時間輪的實現(xiàn)HashedWheelTimer
HashedWheelTimer的實現(xiàn)要比kafka內部的實現(xiàn)復雜許多,和kafka不同的是,它的內部推進不是依靠的DelayQueue而是自己實現(xiàn)了一套,源碼太長,有興趣的讀者可以自己去看一下。
小結
時間輪說了這么多,我們可以看到他的效率是很出眾的,但是還是有這么一個問題:他不支持分布式。當我們的業(yè)務很復雜,需要分布式的時候,時間輪顯得力不從心,那么這個時候有什么好一點的延時隊列的選擇呢?我們或許可以嘗試使用第三方的工具
redis延時隊列
其實啊說起延時,我們如果常用redis的話,就會想起redis是存在過期機制的,那么我們是否可以利用這個機制來實現(xiàn)一個延時隊列呢?
redis自帶key的過期機制,而且可以設置過期后的回調方法?;诖颂匦?,我們可以非常容易就完成一個延時隊列,任務進來時,設定定時時間,并且配置好過期回調方法即可。
除了使用redis的過期機制之外,我們也可以利用它自帶的zset來實現(xiàn)延時隊列。zset支持高性能的排序,因此我們任務進來時可以將時間戳作為排序的依據(jù),以此將任務的執(zhí)行先后進行有序的排列,這樣也能實現(xiàn)延時隊列。
zset實現(xiàn)延時隊列的好處:
- 支持高性能排序
redis本身的高可用和高性能以及持久性
mq延時隊列
rocketmq延時消息
rocketmq天然支持延時消息,他的延時消息分為18個等級,每個等級對應不同的延時時間。
那么他的原理是怎樣的呢?
rocketmq的broker收到消息后會將消息寫入commitlog,并且判斷這個消息是否是延時消息(即delay屬性是否大于0),之后如果判斷確實是延時消息,那么他不會馬上寫入,而是通過轉發(fā)的方式將消息放入對應的延時topic(18個延時級別對應18個topic)
rocketmq會有一個定時任務進行輪詢,如果任務的延遲時間已經到了就發(fā)往指定的topic。
這個設計比較的簡單粗暴,但是缺點也十分明顯:
- 延時是固定的,如果想要的延遲超出18個級別就沒辦法實現(xiàn)
- 無法實現(xiàn)精準延時,隊列的堆積等等情況也會導致執(zhí)行產生誤差
rocketmq的精準延時消息
rocketmq本身是不支持的精確延遲的,他的商業(yè)版本ons倒是支持。不過rocketmq的社區(qū)中有相應的解決方案。方案是借助于時間輪算法來實現(xiàn)的,感興趣的朋友可以自行去社區(qū)查看。(社區(qū)中的一些未被合并的pr是不錯的實現(xiàn)參考)
總結
延時隊列的實現(xiàn)千千萬,但是如果要在生產中大規(guī)模使用,那么大部分情況下其實都避不開時間輪算法。改進過的時間輪算法可以做到精準延時,持久化,高性能,高可用性,可謂是完美。但是話又說回來,其他的延時方式就無用了嗎?其實不是的,所有的方式都是需要匹配自己的使用場景。如果你是極少量數(shù)據(jù)的輪詢,那么定時輪詢數(shù)據(jù)庫或許才是最佳的解決方案,而不是無腦的引入復雜的延時隊列。如果是單機的任務,那么jdk的延時隊列也是不錯的選擇。
本文介紹的這些延時隊列只是為了向大家展示他們的原理和優(yōu)缺點,具體的使用還需要結合自己業(yè)務的場景。
以上就是一文帶你深入了解Java中延時任務的實現(xiàn)的詳細內容,更多關于Java延時任務的資料請關注腳本之家其它相關文章!
相關文章
Java?axios與spring前后端分離傳參規(guī)范總結
這篇文章主要介紹了Java?axios與spring前后端分離傳參規(guī)范總結,文章圍繞主題展開詳細的內容介紹,具有一定的參考價值,需要的朋友可以參考一下2022-08-08
JAVASE精密邏輯控制過程詳解(分支和循環(huán)語句)
在一個程序執(zhí)行的過程中各條語句的執(zhí)行順序對程序的結果是有直接影響的,這篇文章主要給大家介紹了關于JAVASE精密邏輯控制(分支和循環(huán)語句)的相關資料,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-04-04
Java SpringBoot Validation用法案例詳解
這篇文章主要介紹了Java SpringBoot Validation用法案例詳解,本篇文章通過簡要的案例,講解了該項技術的了解與使用,以下就是詳細內容,需要的朋友可以參考下2021-09-09
Java使用正則表達式去除小數(shù)點后面多余的0功能示例
這篇文章主要介紹了Java使用正則表達式去除小數(shù)點后面多余的0功能,結合具體實例形式分析了java字符串正則替換相關操作技巧,需要的朋友可以參考下2017-06-06

