Java阻塞延遲隊列DelayQueue原理及使用詳解
前言
從阻塞隊列說起,阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。
這兩個附加的操作是:在隊列為空時,從隊列中獲取元素的消費(fèi)者線程會一直等待直到隊列變?yōu)榉强铡?/p>
當(dāng)隊列滿時,向隊列中放置元素的生產(chǎn)者線程會等待直到隊列可用。
阻塞隊列常用于生產(chǎn)者和消費(fèi)者的場景,生產(chǎn)者是往隊列里添加元素的線程,消費(fèi)者是從隊列里拿元素的線程。
在阻塞隊列不可用時,這兩個附加操作提供了4種處理方式:

- 拋出異常:當(dāng)隊列滿時,插入元素會拋出IllegalStateException;
- 返回特殊值:offer()是入隊方法,當(dāng)插入成功時返回true,插入失敗返回false;poll()是出隊方法,當(dāng)出隊成功時返回元素的值,隊列為空時返回null
- 一直阻塞:當(dāng)隊列滿時,阻塞執(zhí)行插入方法的線程;當(dāng)隊列空時,阻塞執(zhí)行出隊方法的線程
- 超時退出:顧名思義
下面是Java常見的阻塞隊列。
- ArrayBlockingQueue :一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列
- LinkedBlockingQueue :一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列
- PriorityBlockingQueue :一個支持優(yōu)先級排序的無界阻塞隊列
- DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列
- SynchronousQueue:一個不存儲元素的阻塞隊列
- LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列
- LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列
DelayQueue解析
DelayQueue隊列中每個元素都有一個過期時間,并且隊列是個優(yōu)先級隊列,當(dāng)從隊列獲取元素的時候,只有過期元素才會出隊,DelayQueue的類結(jié)構(gòu)如下圖所示:

如圖DelayQueue中內(nèi)部使用的是PriorityQueue存放數(shù)據(jù),使用ReentrantLock實現(xiàn)線程同步。
另外隊列里面的元素要實現(xiàn)Delayed接口,一個是獲取當(dāng)前剩余時間的接口,一個是元素比較的接口,因為這個是有優(yōu)先級的隊列。
DelayQueue類的主要成員
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// 持有內(nèi)部重入鎖。
private final transient ReentrantLock lock = new ReentrantLock();
// 優(yōu)先級隊列,存放工作任務(wù)。
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
// 依賴于重入鎖的condition。
private final Condition available = lock.newCondition();
}元素入隊列
插入元素到隊列中主要三個方法,但實際上底層調(diào)用的都是offer(e)方法
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return offer(e);
}
/**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
*
* @param e the element to add
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) {
offer(e);
}
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
//獲取到重入鎖
lock.lock();
try {
q.offer(e);
//添加成功元素
if (q.peek() == e) {
leader = null;
//將等待隊列中的頭節(jié)點(diǎn)移動到同步隊列。
available.signal();
}
return true;
} finally {
lock.unlock();
}
}首先獲取獨(dú)占鎖,然后添加元素到優(yōu)先級隊列,由于q是優(yōu)先級隊列,所以添加完元素后,peek()方法返回的并不一定是剛才添加的元素,如果判斷為true,說明當(dāng)前元素e的優(yōu)先級最小也就是即將過期的,這時候激活avaliable變量條件隊列里面的線程,通知它們隊列里面有元素了。
從隊列中取元素
有兩個方法可以取元素(都是取隊頭),poll()方法取隊頭當(dāng)隊頭元素沒過期時返回null,take()方法取隊頭當(dāng)隊頭元素沒過期時會一直等待。
/**
* Retrieves and removes the head of this queue, or returns {@code null}
* if this queue has no elements with an expired delay.
*
* @return the head of this queue, or {@code null} if this
* queue has no elements with an expired delay
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
//如果隊列為空,或者不為空但是隊頭元素沒有過期則返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
// 獲取鎖。每個延遲隊列內(nèi)聚了一個重入鎖。
final ReentrantLock lock = this.lock;
// 獲取可中斷的鎖。
lock.lockInterruptibly();
try {
for (;;) {
// 嘗試從優(yōu)先級隊列中獲取隊列頭部元素,獲取但不移除
E first = q.peek();
if (first == null)
//無元素,當(dāng)前線程節(jié)點(diǎn)加入等待隊列,并阻塞當(dāng)前線程
available.await();
else {
// 通過延遲任務(wù)的getDelay()方法獲取延遲時間
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
//延遲時間到期,獲取并刪除頭部元素。
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 線程節(jié)點(diǎn)進(jìn)入等待隊列 x 納秒。
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 若還存在元素的話,則將等待隊列頭節(jié)點(diǎn)中的線程節(jié)點(diǎn)移動到同步隊列中。
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}重點(diǎn)說一下take()方法,第一次調(diào)用take時候由于隊列空,所以把當(dāng)前線程放入available的條件隊列中等待,當(dāng)執(zhí)行offer()成功并且添加的新元素恰好就是優(yōu)先級隊列的隊首時就會通知最先等待的線程激活,循環(huán)重新獲取隊首元素,這時候first假如不空,則調(diào)用getDelay()方法看該元素還剩下多少時間就過期了,如果delay<=0則說明已經(jīng)過期,則直接出隊返回。
否則看leader是否為null,不為null則說明是其他線程也在執(zhí)行take()則把當(dāng)前線程放入條件隊列,否則就是只有當(dāng)前線程在執(zhí)行take()方法,則當(dāng)前線程await直到剩余過期時間到,這期間該線程會釋放鎖,所以其他線程可以offer()添加元素,也可以take()阻塞自己,剩余過期時間到后,當(dāng)前線程會重新競爭鎖,重新進(jìn)入循環(huán)。
如果已經(jīng)具備了JUC包中的Lock接口以及AQS的相關(guān)知識,上述代碼大部分應(yīng)該都比較容易理解。
DelayQueue將實現(xiàn)了Delayed接口的對象添加到優(yōu)先級隊列中,通過在可重入鎖的Condition上調(diào)用await()方法,實現(xiàn)了延遲獲取阻塞隊列中元素的功能。
總結(jié)
- DelayQueue是一個內(nèi)部依靠AQS隊列同步器所實現(xiàn)的無界延遲阻塞隊列。
- 隊列中的延遲對象需要覆蓋getDelay()與compareTo()方法,并且要注意 getDelay()的時間單位的統(tǒng)一,compareTo()根據(jù)業(yè)務(wù)邏輯進(jìn)行合理的比較邏輯重寫。
- DelayQueue中內(nèi)聚的可重入鎖是非公平的。
- 延遲隊列是實現(xiàn)定時任務(wù)的關(guān)鍵,ScheduledThreadPoolExecutor中的任務(wù)隊列是DelayedWorkQueue,其和DelayedQueue高度類似,也是一個延遲隊列。
DelayQueue使用例子
寫一個簡單的例子:
public class DelayQueueTest {
public static final int SIZE = 10;
public static void main(String[] args) {
DelayQueueTest test = new DelayQueueTest();
//初始化線程池
BlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<>(10);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor
(5, 10, 10, TimeUnit.MILLISECONDS,
arrayBlockingQueue, Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
DelayQueue<DelayedTask> delayTaskQueue = new DelayQueue<>();
//模擬SIZE個延遲任務(wù)
for (byte i = 0; i < SIZE; i++) {
Long runAt = System.currentTimeMillis() + 1000 * i;
String name = "Zhang_" + i;
byte age = (byte)(10 + i);
String gender = (i % 2 == 0 ? "male" : "female");
Student student = new StudentBuilder(name, age, gender).height(150 + i).province("ZheJiang").build();
delayTaskQueue.put(new DelayedTask<Student>(student, 1, function -> test.print(student), runAt));
}
while (true) {
if (delayTaskQueue.size() == 0) {
break;
}
try {
//從延遲隊列中取值,如果沒有對象過期則取到null
DelayedTask delayedTask = delayTaskQueue.poll();
if (delayedTask != null) {
threadPool.execute(delayedTask);
}
} catch (Exception e) {
e.printStackTrace();
}
}
threadPool.shutdown();
}
public String print(Object object) {
System.out.println(Thread.currentThread().getName());
String str = ">>>junit log>>>" + object.getClass().getSimpleName() + ":" + object.toString();
System.out.println(str);
return str;
}
private static class DelayedTask<T> implements Delayed, Runnable {
/**
* 任務(wù)參數(shù)
*/
private T taskParam;
/**
* 任務(wù)類型
*/
private Integer type;
/**
* 任務(wù)函數(shù)
*/
private Function<T, String> function;
/**
* 任務(wù)執(zhí)行時刻
*/
private Long runAt;
public T getTaskParam() {
return taskParam;
}
public Integer getType() {
return type;
}
public Function<T, String> getFunction() {
return function;
}
public Long getRunAt() {
return runAt;
}
DelayedTask(T taskParam, Integer type, Function<T, String> function, Long runAt) {
this.taskParam = taskParam;
this.type = type;
this.function = function;
this.runAt = runAt;
}
@Override
public void run() {
if (taskParam != null) {
function.apply(taskParam);
}
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.runAt - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
DelayedTask object = (DelayedTask)o;
return this.runAt.compareTo(object.getRunAt());
}
}
}運(yùn)行結(jié)果如下,由于10個元素的延遲時間均相差1秒,可以看到逐步打印的效果。

DelayQueue典型場景是重試機(jī)制實現(xiàn),比如當(dāng)調(diào)用接口失敗后,把當(dāng)前調(diào)用信息放入delay=10s的元素,然后把元素放入隊列,那么這個隊列就是一個重試隊列,一個線程通過take()方法獲取需要重試的接口,take()返回則接口進(jìn)行重試,失敗則再次放入隊列,同時也可以在元素加上重試次數(shù)。
到此這篇關(guān)于Java阻塞延遲隊列DelayQueue原理及使用詳解的文章就介紹到這了,更多相關(guān)阻塞延遲隊列DelayQueue原理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring注解中@Autowired和@Bean的區(qū)別詳解
這篇文章主要詳細(xì)介紹了Spring注解中@Autowired和@Bean二者有什么區(qū)別,文中通過兩個注解的使用場景介紹了二者的區(qū)別,感興趣的同學(xué)可以參考閱讀2023-06-06
Java?Stream流中的filter()使用方法舉例詳解
filter()是Java?Stream?API中的中間操作,用于根據(jù)給定的Predicate條件篩選流中的元素,它通過接收一個返回boolean值的函數(shù)(斷言)作為參數(shù),篩選出滿足條件的元素并收集到新的流中,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-11-11
Spring Boot項目中jar包在服務(wù)器上啟動的正確姿勢
這篇文章主要給大家介紹了關(guān)于Spring Boot項目中jar包在服務(wù)器上啟動的正確姿勢,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-01-01
SpringBoot數(shù)據(jù)校驗及多環(huán)境配置的問題詳解
這篇文章主要介紹了SpringBoot數(shù)據(jù)校驗及多環(huán)境配置,本文以SpringBoot-02-Config 項目為例,給大家詳細(xì)介紹,需要的朋友可以參考下2021-09-09
Java數(shù)據(jù)類型轉(zhuǎn)換實例解析
這篇文章主要介紹了Java數(shù)據(jù)類型轉(zhuǎn)換實例解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-11-11
Java volatile關(guān)鍵字原理剖析與實例講解
volatile是Java提供的一種輕量級的同步機(jī)制,Java?語言包含兩種內(nèi)在的同步機(jī)制:同步塊(或方法)和?volatile?變量,本文將詳細(xì)為大家總結(jié)Java volatile關(guān)鍵字,通過詳細(xì)的代碼示例給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-07-07
spring boot實現(xiàn)阿里云視頻點(diǎn)播上傳視頻功能(復(fù)制粘貼即可)
這篇文章主要介紹了spring boot實現(xiàn)阿里云視頻點(diǎn)播上傳視頻功能(復(fù)制粘貼即可),本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-12-12
Java NIO 文件通道 FileChannel 用法及原理
這篇文章主要介紹了Java NIO 文件通道 FileChannel 用法和原理,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01

