生產(chǎn)消費者模式實現(xiàn)方式和線程安全問題代碼示例
生產(chǎn)者消費者模式的幾種實現(xiàn)方式
拿我們生活中的例子來說,工廠生產(chǎn)出來的產(chǎn)品總是要輸出到外面使用的,這就是生產(chǎn)與消費的概念。
在我們實際的軟件開發(fā)過程中,經(jīng)常會碰到如下場景:某個模塊負責產(chǎn)生數(shù)據(jù),這些數(shù)據(jù)由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數(shù)、線程、進程等)。
產(chǎn)生數(shù)據(jù)的模塊,就形象地稱為生產(chǎn)者;而處理數(shù)據(jù)的模塊,就稱為消費者。
第一種:采用wait—notify實現(xiàn)生產(chǎn)者消費者模式
1. 一生產(chǎn)者與一消費者:
2. 一生產(chǎn)者與多消費者:
第二種: 采用阻塞隊列實現(xiàn)生產(chǎn)者消費者模式
3. 使用阻塞隊列實現(xiàn)生產(chǎn)者消費者模式
相信大家都有去吃過日本料理。有個很誘人的餐食就是烤肉,烤肉師父會站在一邊一直烤肉,再將烤好的肉放在一個盤子中;而流著口水的我們這些食客會坐在一邊,只要盤子里有肉我們就會一直去吃。
在這個生活案例中,烤肉師父就是生產(chǎn)者,他就負責烤肉,烤完了就把肉放在盤子里,而不是直接遞給食客(即不用通知食客去吃肉),如果盤子肉滿,師父就會停一會,直到有人去食用烤肉后再去進行生產(chǎn)肉;而食客的我們就只是盯著盤子,一旦盤子有肉我們就負責去吃就行;
整個過程中食客與烤肉師父都不是直接打交道的,而是都與盤子進行交互。
盤子充當了一個緩沖區(qū)的概念,有東西生產(chǎn)出來就把東西放進去,盤子也是有大小限制,超過盤子大小就會阻塞生產(chǎn)者生產(chǎn),等待消費者去消費;當盤子為空的時候 ,即阻塞消費者消費,等待生產(chǎn)者去生產(chǎn)。
編程中阻塞隊列即可以實現(xiàn)盤子這個功能。
阻塞隊列的特點:
當隊列元素已滿的時候,阻塞插入操作;
當隊列元素為空的時候,阻塞獲取操作。
ArrayBlockingQueue 與 LinkedBlockingQueue都是支持FIFO(先進先出),但是LinkedBlockingQueue是無界的,而ArrayBlockingQueue 是有界的。
下面使用阻塞隊列實現(xiàn)生產(chǎn)者消費者:
生產(chǎn)者:
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable{
private final BlockingQueue blockingQueue;
//設置隊列緩存的大小。生產(chǎn)過程中超過這個大小就暫時停止生產(chǎn)
private final int QUEUE_SIZE = 10;
public Producer(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
}
int task = 1;
@Override
public void run() {
while(true){
try {
System.out.println("正在生產(chǎn):" + task);
//將生產(chǎn)出來的產(chǎn)品放在隊列緩存中
blockingQueue.put(task);
++task;
//讓其停止一會,便于查看效果
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消費者:
import java.util.concurrent.BlockingQueue;
//消費者
public class Consumer implements Runnable{
private final BlockingQueue blockingQueue;
public Consumer(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
//只要阻塞隊列中有任務,就一直去消費
while(true){
try {
System.out.println("正在消費: " + blockingQueue.take());
//讓其停止一會,便于查看效果
Thread.sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
測試:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 生產(chǎn)者消費者模式
* 使用阻塞隊列BlockingQueue
* @author wanggenshen
*
*/
public class TestConPro {
public static void main(String[] args){
BlockingQueue blockingQueue = new LinkedBlockingQueue(5);
Producer p = new Producer(blockingQueue);
Consumer c = new Consumer(blockingQueue);
Thread tp = new Thread(p);
Thread tc= new Thread(c);
tp.start();
tc.start();
}
}
因為LinkedBlockingQueue是無界隊列,所以生產(chǎn)者會不斷去生產(chǎn),將生產(chǎn)出的任務放在隊列中,消費者去隊列中去消費:

如果改用有界阻塞隊列ArrayBlockingQueue,就可以初始化隊列的大小。則隊列中元素超過隊列大小的時候,生產(chǎn)者就會等待消費者消費一個再去生產(chǎn)一個:
測試代碼:
初始化一個大小為10的ArrayBlockingQueue:
public static void main(String[] args){
BlockingQueue blockingQueue = new ArrayBlockingQueue(10);
Producer p = new Producer(blockingQueue);
Consumer c = new Consumer(blockingQueue);
Thread tp = new Thread(p);
Thread tc= new Thread(c);
tp.start();
tc.start();
}
測試中,讓生產(chǎn)者生產(chǎn)速度略快,而消費者速度略慢一點。可以看到生產(chǎn)出來的產(chǎn)品序號與消費的產(chǎn)品序號差始終為10(隊列的大小):

總結(jié)
以上就是本文關于生產(chǎn)消費者模式實現(xiàn)方式和線程安全問題代碼示例的全部內(nèi)容,希望對大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站其他相關專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
相關文章
基于SpringBoot+Redis實現(xiàn)分布式鎖
本文主要介紹了基于SpringBoot+Redis實現(xiàn)分布式鎖,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-05-05
Java并發(fā)編程之柵欄(CyclicBarrier)實例介紹
這篇文章主要介紹了Java并發(fā)編程之柵欄(CyclicBarrier)實例介紹,柵欄類似閉鎖,但是它們是有區(qū)別的,需要的朋友可以參考下2015-04-04
Springboot+WebSocket實現(xiàn)一對一聊天和公告的示例代碼
這篇文章主要介紹了Springboot+WebSocket實現(xiàn)一對一聊天和公告的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04
springboot druid mybatis多數(shù)據(jù)源配置方式
這篇文章主要介紹了springboot druid mybatis多數(shù)據(jù)源配置方式,具有很好的參考價值,希望對大家有所幫助,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12

