Java并發(fā)編程之阻塞隊列深入詳解
1. 什么是阻塞隊列
阻塞隊列是一種特殊的隊列,和數(shù)據(jù)結(jié)構(gòu)中普通的隊列一樣,也遵守先進(jìn)先出的原則同時,阻塞隊列是一種能保證線程安全的數(shù)據(jù)結(jié)構(gòu),并且具有以下兩種特性:當(dāng)隊列滿的時候,繼續(xù)向隊列中插入元素就會讓隊列阻塞,直到有其他線程從隊列中取走元素;當(dāng)隊列為空的時候,繼續(xù)出隊列也會讓隊列阻塞,直到有其他線程往隊列中插入元素
補(bǔ)充:線程阻塞的意思指代碼此時不會被執(zhí)行,即操作系統(tǒng)在此時不會把這個線程調(diào)度到CPU上去執(zhí)行了
2. 阻塞隊列的代碼使用
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingDeque;
public class Test {
public static void main(String[] args) throws InterruptedException {
//不能直接newBlockingDeque,因為它是一個接口,要向上轉(zhuǎn)型
//LinkedBlockingDeque內(nèi)部是基于鏈表方式來實現(xiàn)的
BlockingDeque<String> queue=new LinkedBlockingDeque<>(10);//此處可以指定一個具體的數(shù)字,這里的的10代表隊列的最大容量
queue.put("hello");
String elem=queue.take();
System.out.println(elem);
elem=queue.take();
System.out.println(elem);
}
}
注意: put方法帶有阻塞功能,但是offer不具有,所以一般用put方法(能使用offer方法的原因是 BlockingDeque繼承了Queue)

打印結(jié)果如上所示,當(dāng)打印了hello后,隊列為空,代碼執(zhí)行到elem=queue.take();就不會繼續(xù)往下執(zhí)行了,此時線程進(jìn)入阻塞等待狀態(tài),什么也不會打印了,直到有其他線程給隊列中放入新的元素為止
3. 生產(chǎn)者消費(fèi)者模型
生產(chǎn)者消費(fèi)者模型是在服務(wù)器開發(fā)和后端開發(fā)中比較常用的編程手段,一般用于解耦合和削峰填谷。
高耦合度:兩個代碼模塊的關(guān)聯(lián)關(guān)系比較高
高內(nèi)聚:一個代碼模塊內(nèi)各個元素彼此結(jié)合的緊密
因此,我們一般追求高內(nèi)聚低耦合,這樣會加快執(zhí)行效率,而使用生產(chǎn)者消費(fèi)者模型就可以解耦合
(1)應(yīng)用一:解耦合
我們以實際生活中的情況為例,這里有兩臺服務(wù)器:A服務(wù)器和B服務(wù)器,當(dāng)A服務(wù)器傳輸數(shù)據(jù)給B時,要是直接傳輸?shù)脑挘敲床皇茿向B推送數(shù)據(jù),就是B從A中拉取數(shù)據(jù),都是需要A和B直接交互,所以A和B存在依賴關(guān)系(A和B的耦合度比較高)。未來如果服務(wù)器需要擴(kuò)展,比如加一個C服務(wù)器,讓A給C傳數(shù)據(jù),那么改動就比較復(fù)雜,且會降低效率。這時我們可以加一個隊列,這個隊列為阻塞隊列,如果A把數(shù)據(jù)寫到隊列里,B從中取,那么隊列相當(dāng)于是中轉(zhuǎn)站(或者說交易場所),A相當(dāng)于生產(chǎn)者(提供數(shù)據(jù)),B相當(dāng)于消費(fèi)者(接收數(shù)據(jù)),此時就構(gòu)成了生產(chǎn)者消費(fèi)者模型,這樣會讓代碼耦合度更低,維護(hù)更方便,執(zhí)行效率更高。

在計算機(jī)中,生產(chǎn)者充當(dāng)其中一組線程,而消費(fèi)者充當(dāng)另一組線程,而交易場所就可以使用阻塞隊列了
(2)應(yīng)用二:削峰填谷

實際生活中
在河道中大壩算是一個很重要的組成部分了,如果沒有大壩,大家試想一下結(jié)果:當(dāng)汛期來臨后上游的水很大時,下游就會涌入大量的水發(fā)生水災(zāi)讓莊稼被淹沒;而旱期的話下游的水會很少可能會引發(fā)旱災(zāi)。若有大壩的話,汛期時大壩把多余的水存到大壩中,關(guān)閘蓄水,讓上游的水按一定速率往下流,避免突然一波大雨把下游淹了,這樣下游不至于出現(xiàn)水災(zāi)。旱期時大壩把之前儲存好的水放出來,還是讓讓水按一定速率往下流,避免下流太缺水,這樣既可以避免汛期發(fā)生洪澇又可以避免旱期發(fā)生旱災(zāi)了。
峰:相當(dāng)于汛期
谷:相當(dāng)于旱期
計算機(jī)中
這樣的情況在計算機(jī)中也是很典型的,尤其是在服務(wù)器開發(fā)中,網(wǎng)關(guān)通常會把互聯(lián)網(wǎng)中的請求轉(zhuǎn)發(fā)給業(yè)務(wù)服務(wù)器,比如一些商品服務(wù)器,用戶服務(wù)器,商家服務(wù)器(存放商家的信息),直播服務(wù)器。但因為互聯(lián)網(wǎng)過來的請求數(shù)量是多是少不可控,相當(dāng)于上游的水,如果突然來了一大波請求,網(wǎng)關(guān)即使能扛得住,后續(xù)的很多服務(wù)器收到很多請求也就會崩潰(處理一個請求涉及到一系列的數(shù)據(jù)庫操作,因為數(shù)據(jù)庫相關(guān)操作效率本身比較低,這樣請求多了就處理不過來了,因此就會崩潰)

所以實際情況中網(wǎng)關(guān)和業(yè)務(wù)服務(wù)器之間往往用一個隊列來緩沖,這個隊列就是阻塞隊列(交易場所),用這個隊列來實現(xiàn)生產(chǎn)者(網(wǎng)關(guān))消費(fèi)者(業(yè)務(wù)服務(wù)器)模型,把請求緩存到隊列中,后面的消費(fèi)者(業(yè)務(wù)服務(wù)器)按照自己固定的速率去讀請求。這樣當(dāng)請求很多時,雖然隊列服務(wù)器可能會稍微受到一定壓力,但能保證業(yè)務(wù)服務(wù)器的安全。
(3)相關(guān)代碼
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestDemo {
public static void main(String[] args) {
// 使用一個 BlockingQueue 作為交易場所
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
// 此線程作為消費(fèi)者
Thread customer = new Thread() {
@Override
public void run() {
while (true) {
// 取隊首元素
try {
Integer value = queue.take();
System.out.println("消費(fèi)元素: " + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
customer.start();
// 此線程作為生產(chǎn)者
Thread producer = new Thread() {
@Override
public void run() {
for (int i = 1; i <= 10000; i++) {
System.out.println("生產(chǎn)了元素: " + i);
try {
queue.put(i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
producer.start();
try {
customer.join();
producer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

打印如上(此代碼是讓生產(chǎn)者通過sleep每過1秒生產(chǎn)一個元素,而消費(fèi)者不使用sleep,所以每當(dāng)生產(chǎn)一個元素時,消費(fèi)者都會立馬消費(fèi)一個元素)
4.阻塞隊列和生產(chǎn)者消費(fèi)者模型功能的實現(xiàn)
在學(xué)會如何使用BlockingQueue后,那么如何自己去實現(xiàn)一個呢?
主要思路:
- 1.利用數(shù)組
- 2.head代表隊頭,tail代表隊尾
- 3.head和tail重合后到底是空的還是滿的判斷方法:專門定義一個size記錄當(dāng)前隊列元素個數(shù),入隊列時size加1出隊列時size減1,當(dāng)size為0表示空,為數(shù)組最大長度就是滿的(也可以浪費(fèi)一個數(shù)組空間用head和tail重合表示空,用tail+1和head重合表示滿,但此方法較為麻煩,上一個方法較為直觀,因此我們使用上一個方法)
public class Test2 {
static class BlockingQueue {
private int[] items = new int[1000]; // 此處的1000相當(dāng)于隊列的最大容量, 此處暫時不考慮擴(kuò)容的問題.
private int head = 0;//定義隊頭
private int tail = 0;//定義隊尾
private int size = 0;//數(shù)組大小
private Object locker = new Object();
// put 用來入隊列
public void put(int item) throws InterruptedException {
synchronized (locker) {
while (size == items.length) {
// 隊列已經(jīng)滿了,阻塞隊列開始阻塞
locker.wait();
}
items[tail] = item;
tail++;
// 如果到達(dá)末尾, 就回到起始位置.
if (tail >= items.length) {
tail = 0;
}
size++;
locker.notify();
}
}
// take 用來出隊列
public int take() throws InterruptedException {
int ret = 0;
synchronized (locker) {
while (size == 0) {
// 對于阻塞隊列來說, 如果隊列為空, 再嘗試取元素, 就要阻塞
locker.wait();
}
ret = items[head];
head++;
if (head >= items.length) {
head = 0;
}
size--;
// 此處的notify 用來喚醒 put 中的 wait
locker.notify();
}
return ret;
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new BlockingQueue();
// 消費(fèi)者線程
Thread consumer = new Thread() {
@Override
public void run() {
while (true) {
try {
int elem = queue.take();
System.out.println("消費(fèi)元素: " + elem);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
consumer.start();
// 生產(chǎn)者線程
Thread producer = new Thread() {
@Override
public void run() {
for (int i = 1; i < 10000; i++) {
System.out.println("生產(chǎn)元素: " + i);
try {
queue.put(i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
producer.start();
consumer.join();
producer.join();
}
}

運(yùn)行結(jié)果如上。
注意:
- 1.wait和notify的正確使用
- 2.put和take都會產(chǎn)生阻塞情況,但阻塞條件是對立的,wait不會同時觸發(fā)(put喚醒take阻塞,take喚醒put阻塞)
到此這篇關(guān)于Java并發(fā)編程之阻塞隊列深入詳解的文章就介紹到這了,更多相關(guān)Java 阻塞隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java利用策略模式實現(xiàn)條件判斷,告別if else
策略模式定義了一系列算法,并且將每個算法封裝起來,使得他們可以相互替換,而且算法的變化不會影響使用算法的客戶端。本文將通過案例講解如何利用Java的策略模式實現(xiàn)條件判斷,告別if----else條件硬編碼,需要的可以參考一下2022-02-02
Java實現(xiàn)紅黑樹(平衡二叉樹)的詳細(xì)過程
紅黑樹接近平衡的二叉樹,插入,刪除函數(shù)跟平衡二叉樹一樣,只是平衡函數(shù)不同,下面這篇文章主要給大家介紹了關(guān)于Java實現(xiàn)紅黑樹(平衡二叉樹)的相關(guān)資料,需要的朋友可以參考下2021-10-10
jvm crash的崩潰日志詳細(xì)分析及注意點(diǎn)
本篇文章主要介紹了jvm crash的崩潰日志詳細(xì)分析及注意點(diǎn)。具有很好的參考價值,下面跟著小編一起來看下吧2017-04-04
SpringBoot如何讀取配置文件參數(shù)并全局使用
這篇文章主要介紹了SpringBoot如何讀取配置文件參數(shù)并全局使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12
java僅用30行代碼就實現(xiàn)了視頻轉(zhuǎn)音頻的批量轉(zhuǎn)換
這篇文章主要介紹了java僅用30行代碼就實現(xiàn)了視頻轉(zhuǎn)音頻的批量轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04

