Java并發(fā)編程之阻塞隊列詳解
1、什么是阻塞隊列?
隊列是一種數(shù)據(jù)結(jié)構(gòu),它有兩個基本操作:在隊列尾部加入一個元素,從隊列頭部移除一個元素。阻塞隊里與普通的隊列的區(qū)別在于,普通隊列不會對當前線程產(chǎn)生阻塞,在面對類似消費者-生產(chǎn)者模型時,就必須額外的實現(xiàn)同步策略以及線程間喚醒策略。使用阻塞隊列,就會對當前線程產(chǎn)生阻塞,當隊列是空時,從隊列中獲取元素的操作將會被阻塞,當隊列是滿時,往隊列里添加元素的操作也會被阻塞。

2、主要的阻塞隊列及其方法
java.util.concurrent包下提供主要的幾種阻塞隊列,主要有以下幾個:
1)ArrayBlockingQueue:基于數(shù)組實現(xiàn)的阻塞隊列,在創(chuàng)建ArrayBlockingQueue對象時必須指定其容量大小,還可以指定訪問策略,默認情況下為非公平的,即不保證等待時間最長的線程最優(yōu)先能夠訪問隊列。
2)、LinkedBlockingQueue:基于鏈表實現(xiàn)的一個阻塞隊列,在創(chuàng)建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE。
3)、以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優(yōu)先級對元素進行排序,按照優(yōu)先級順序出隊,每次出隊的元素都是優(yōu)先級最高的元素。注意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標志),前面2種都是有界隊列。
4)、DelayQueue:基于PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數(shù)據(jù)的操作(生產(chǎn)者)永遠不會被阻塞,而只有獲取數(shù)據(jù)的操作(消費者)才會被阻塞。
阻塞隊列包括了非阻塞隊列中的大部分方法,還提供另外若干非常有用的方法:
put方法用來向隊尾存入元素,如果隊列滿,則等待;
take方法用來從隊首取元素,如果隊列為空,則等待;
offer方法用來向隊尾存入元素,如果隊列滿,則等待一定的時間,當時間期限達到時,如果還沒有插入成功,則返回false;否則返回true;
poll方法用來從隊首取元素,如果隊列空,則等待一定的時間,當時間期限達到時,如果取到,則返回null;否則返回取得的元素;
下面看一段代碼:
import java.util.concurrent.ArrayBlockingQueue;
/**
* @author 作者:徐劍 E-mail:anxu_2013@163.com
* @version 創(chuàng)建時間:2016年3月20日 下午12:52:53
* 類說明
*/
public class BlockingQueue
{
public static void main(String[] args) throws InterruptedException
{
java.util.concurrent.BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
for (int i = 0; i < 10; i++)
{
// 將指定元素添加到此隊列中
blockingQueue.put("加入元素" + i);
System.out.println("向阻塞隊列中添加了元素:" + i);
}
System.out.println("程序到此運行結(jié)束,即將退出----");
}
}
當限制阻塞隊列數(shù)量為5時,添加了5個元素之后,繼續(xù)添加將會隊列外阻塞等待,此時程序并未終止。

當隊列滿了之后,我們將隊首元素移除,則可以繼續(xù)向阻塞隊列中添加元素,代碼如下:
public class BlockingQueue
{
public static void main(String[] args) throws InterruptedException
{
java.util.concurrent.BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
for (int i = 0; i < 10; i++)
{
// 將指定元素添加到此隊列中
blockingQueue.put("加入元素" + i);
System.out.println("向阻塞隊列中添加了元素:" + i);
if(i>=4)
System.out.println("移除隊首元素"+blockingQueue.take());
}
System.out.println("程序到此運行結(jié)束,即將退出----");
}
執(zhí)行結(jié)果如下:

3、阻塞隊列的實現(xiàn)原理
下面主要看一下ArrayBlockingQueue的實現(xiàn)原理。
首先看一下ArrayBlockingQueue類的成員變量:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 底層存儲結(jié)構(gòu)-數(shù)組 */
final Object[] items;
/** 隊首元素下標 */
int takeIndex;
/** 隊尾元素下標 */
int putIndex;
/**隊列元素總數(shù) */
int count;
/** 重入鎖 */
final ReentrantLock lock;
/** notEmpty等待條件 */
private final Condition notEmpty;
/** notFull等待條件 */
private final Condition notFull;
/**
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
transient Itrs itrs = null;
可以看到,ArrayBlockingQueue用來存儲元素的實際上是一個數(shù)組。
再看下ArrayBlockingQueue兩個重要方法的實現(xiàn),put()和take():
public void put(E e) throws InterruptedException
{
//先檢查e是否為空
checkNotNull(e);
//獲取鎖
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
{
//當隊列已滿,進入條件等待
while (count == items.length)
notFull.await();
//隊列不滿,進行入隊列操作
enqueue(e);
}
finally
{
//釋放鎖
lock.unlock();
}
}
再看下具體的入隊操作:
private void enqueue(E x)
{
final Object[] items = this.items;
//隊尾入隊
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
//隊列總數(shù)+1
count++;
//notempty條件的等待集中隨機選擇一個線程,解除其阻塞狀態(tài)
notEmpty.signal();
}
下面是take()方法的源代碼:
public E take() throws InterruptedException
{
//獲取鎖
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
{
//隊列為空
while (count == 0)
//線程加入notEmpty條件等待集
notEmpty.await();
//非空,出隊列
return dequeue();
} finally
{
//釋放鎖
lock.unlock();
}
}
4、阻塞隊列的應用:實現(xiàn)消費者-生產(chǎn)者模式
/**
* @author 作者:徐劍 E-mail:anxu_2013@163.com
* @version 創(chuàng)建時間:2016年3月20日 下午2:21:55
* 類說明:阻塞隊列實現(xiàn)的消費者-生產(chǎn)者模式
*/
public class Test
{
private int queueSize = 10;
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
public static void main(String[] args)
{
Test test = new Test();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread
{
@Override
public void run()
{
consume();
}
private void consume()
{
while (true)
{
try
{
queue.take();
System.out.println("從隊列取走一個元素,隊列剩余" + queue.size() + "個元素");
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}
class Producer extends Thread
{
@Override
public void run()
{
produce();
}
private void produce()
{
while (true)
{
try
{
queue.put(1);
System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+ (queueSize - queue.size()));
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}
}
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助。
相關文章
SpringBoot+Hibernate實現(xiàn)自定義數(shù)據(jù)驗證及異常處理
這篇文章主要為大家介紹了SpringBoot如何整合Hibernate自定義數(shù)據(jù)驗證及多種方式異常處理,文中的示例代碼講解詳細,感興趣的可以了解一下2022-04-04
Java利用JSONPath操作JSON數(shù)據(jù)的技術指南
JSONPath?是一種強大的工具,用于查詢和操作?JSON?數(shù)據(jù),類似于?SQL?的語法,它為處理復雜的?JSON?數(shù)據(jù)結(jié)構(gòu)提供了簡單且高效的解決方案,本文將介紹?JSONPath?的基本語法,并通過詳細的?Java?示例展示其實際應用,需要的朋友可以參考下2025-04-04
源碼分析Spring?中?@Qualifier?注解基本用法
這篇文章主要介紹了源碼分析Spring?中?@Qualifier?注解基本用法,在源碼分析的過程中,也?GET?到?Spring?許多新的玩法,感興趣的小伙伴趕緊去試試吧2023-08-08
Springmvc 4.x利用@ResponseBody返回Json數(shù)據(jù)的方法
這篇文章主要介紹了Springmvc 4.x利用@ResponseBody返回Json數(shù)據(jù)的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-04-04

