Java編程生產(chǎn)者消費(fèi)者實(shí)現(xiàn)的四種方法
實(shí)現(xiàn)生產(chǎn)者消費(fèi)者的四種方式
一、最基礎(chǔ)的
利用 wait() 和 notify() 方法實(shí)現(xiàn),當(dāng)緩沖區(qū)滿或?yàn)榭諘r(shí)都調(diào)用 wait() 方法等待,當(dāng)生產(chǎn)者生產(chǎn)了一個(gè)產(chǎn)品或消費(fèi)者消費(fèi)了一個(gè)產(chǎn)品后會喚醒所有線程;
package com.practice;
public class testMain {
private static Integer count = 0;
private static final Integer FULL = 10;
private static String LOCK = "lock";
public static void main(String[] args) {
testMain testMain = new testMain();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
}
class Producer implements Runnable{
@Override
public void run(){
for (int i = 0; i < 10; i++) {
try{
Thread.sleep(3000);
}catch (Exception e){
e.printStackTrace();
}
synchronized (LOCK){
while(count == FULL){//緩存空間滿了
try{
LOCK.wait();//線程阻塞
}catch (Exception e){
e.printStackTrace();
}
}
count++;//生產(chǎn)者
System.out.println(Thread.currentThread().getName() + "生產(chǎn)者生產(chǎn),目前總共有"+count);
LOCK.notifyAll();//喚醒所有線程
}
}
}
}
class Consumer implements Runnable{
@Override
public void run(){
for (int i = 0; i < 10; i++) {
try{
Thread.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}
synchronized (LOCK){
while(count == 0){
try{
LOCK.wait();
}catch (Exception e){
}
}
count--;
System.out.println(Thread.currentThread().getName() + "消費(fèi)者消費(fèi),目前總共有 "+count);
LOCK.notifyAll();//喚醒所有線程
}
}
}
}
}
二、java.util.concurrent.lock 中的 Lock 框架
通過對 lock 的 lock() 方法和 unlock() 方法實(shí)現(xiàn)對鎖的顯示控制,而 synchronize() 則是對鎖的隱形控制,可重入鎖也叫做遞歸鎖,指的是同一個(gè)線程外層函數(shù)獲得鎖之后,內(nèi)層遞歸函數(shù)仍然有獲取該鎖的代碼,但不受影響;
簡單來說,該鎖維護(hù)這一個(gè)與獲取鎖相關(guān)的計(jì)數(shù)器,如果擁有鎖的某個(gè)線程再次得到鎖,那么獲計(jì)數(shù)器就加1,函數(shù)調(diào)用結(jié)束計(jì)數(shù)器就減1,然后鎖需要釋放兩次才能獲得真正釋放,已經(jīng)獲取鎖的線程進(jìn)入其他需要相同鎖的同步代碼塊不會被阻塞
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockTest {
private static Integer count = 0;
private static Integer FULL = 10;
//創(chuàng)建一個(gè)鎖對象
private Lock lock = new ReentrantLock();
//創(chuàng)建兩個(gè)條件變量,一個(gè)為緩沖非滿,一個(gè)緩沖區(qū)非空
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public static void main(String[] args){
ReentrantLockTest testMain = new ReentrantLockTest();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
}
class Producer implements Runnable{
@Override
public void run(){
for (int i = 0; i <10; i++) {
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
// 獲取鎖
lock.lock();
try {
while (count == FULL) {
try{
notFull.await();
}catch(InterruptedException e){
e.printStackTrace();
}
}
count++;
System.out.println(Thread.currentThread().getName()
+ "生產(chǎn)者生產(chǎn),目前總共有" + count);
}finally {
lock.unlock();
}
}
}
}
class Consumer implements Runnable{
@Override
public void run(){
for (int i = 0; i <10; i++) {
try{
Thread.sleep(3000);
}
catch (Exception e){
e.printStackTrace();
}
lock.lock();
try{
while(count==0){
try{
notEmpty.await();
}catch (InterruptedException e){
e.printStackTrace();
}
}
count--;
System.out.println(Thread.currentThread().getName() +
"消費(fèi)者消費(fèi),目前總共有 " + count);
}finally {
lock.unlock();//解鎖
}
}
}
}
}
三、阻塞隊(duì)列BlockingQueue的實(shí)現(xiàn)
被阻塞的情況主要分為如下兩種,BlockingQueue 是線程安全的
1,當(dāng)隊(duì)列滿了的時(shí)候進(jìn)行入隊(duì)操作;
2,當(dāng)隊(duì)列空的時(shí)候進(jìn)行出隊(duì)操作
Blockqueue 接口的一些方法

四類方法分別對應(yīng)于:
1,ThrowsException,如果操作不能馬上進(jìn)行,則拋出異常;
2,SpecialValue 如果操作不能馬上進(jìn)行,將會返回一個(gè)特殊的值,true或false;
3,Blocks 操作被阻塞;
4,TimeOut 指定時(shí)間未執(zhí)行返回一個(gè)特殊值 true 或 false
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 使用 BlockQueue 實(shí)現(xiàn)生產(chǎn)者消費(fèi)模型
*/
public class BlockQueueTest {
public static Integer count = 0;
//創(chuàng)建一個(gè)阻塞隊(duì)列
final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
BlockQueueTest testMain = new BlockQueueTest();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
}
class Producer implements Runnable{
@Override
public void run(){
for (int i = 0; i <10; i++) {
try{
Thread.sleep(3000);
}catch (Exception e){
e.printStackTrace();
}
try{
blockingQueue.put(1);
count++;
System.out.println(Thread.currentThread().getName() + "生產(chǎn)者生產(chǎn),目前總共有 " + count);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
@Override
public void run(){
for (int i = 0; i <10; i++) {
try{
Thread.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}
try{
blockingQueue.take();//消費(fèi)
count--;
System.out.println(Thread.currentThread().getName() +
" 消費(fèi)者消費(fèi),目前總共有 "+ count);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
}
四、信號量 Semaphore 的實(shí)現(xiàn)
Semaphore (信號量) 用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。Java中的 Semaphone 維護(hù)了一個(gè)許可集,一開始設(shè)定這個(gè)許可集的數(shù)量,使用 acquire() 方法獲得一個(gè)許可,當(dāng)許可不足時(shí)會被阻塞,release() 添加一個(gè)許可。
下面代碼中,還加入了 mutex 信號量,維護(hù)消費(fèi)者和生產(chǎn)者之間的同步關(guān)系,保證生產(chǎn)者消費(fèi)者之間的交替進(jìn)行
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
private static Integer count = 0;
//創(chuàng)建三個(gè)信號量
final Semaphore notFull = new Semaphore(10);
final Semaphore notEmpty = new Semaphore(0);
final Semaphore mutex = new Semaphore(1);//互斥鎖,控制共享數(shù)據(jù)的互斥訪問
public static void main(String[] args) {
SemaphoreTest testMain = new SemaphoreTest();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
new Thread(testMain.new Producer()).start();
new Thread(testMain.new Consumer()).start();
}
class Producer implements Runnable{
@Override
public void run(){
for (int i = 0; i <10; i++) {
try{
Thread.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}
try{
notFull.acquire();//獲取一個(gè)信號量
mutex.acquire();
count++;
System.out.println(Thread.currentThread().getName() +
"生產(chǎn)者生產(chǎn),目前總共有 "+count);
} catch (InterruptedException e){
e.printStackTrace();
} finally {
mutex.release();//添加
notEmpty.release();
}
}
}
}
class Consumer implements Runnable{
@Override
public void run(){
for (int i = 0; i <10; i++) {
try{
Thread.sleep(3000);
}catch(InterruptedException e){
e.printStackTrace();
}
try{
notEmpty.acquire();
mutex.acquire();
count--;
System.out.println(Thread.currentThread().getName() +
"消費(fèi)者消費(fèi),目前總共有"+count);
}catch (InterruptedException e){
e.printStackTrace();
}finally {
mutex.release();
notFull.release();
}
}
}
}
}
Reference
https://juejin.cn/post/6844903486895865864#comment
以上就是Java編程生產(chǎn)者消費(fèi)者實(shí)現(xiàn)的四種方法的詳細(xì)內(nèi)容,更多關(guān)于java實(shí)現(xiàn)生產(chǎn)消費(fèi)者的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java操作excel導(dǎo)入導(dǎo)出的3種方式
項(xiàng)目需要,要實(shí)現(xiàn)一個(gè)導(dǎo)入導(dǎo)出excel的功能,于是任務(wù)驅(qū)動(dòng)著我學(xué)習(xí)到了POI、easypoi和easyexcel這3個(gè)java操作Excel的工具,下面這篇文章主要給大家介紹了關(guān)于java操作excel導(dǎo)入導(dǎo)出的3種方式,需要的朋友可以參考下2023-05-05
超詳細(xì)解析Spring Bean的創(chuàng)建過程
這篇文章主要揭秘了Spring Bean的創(chuàng)建過程,文中通過代碼示例和圖文結(jié)合的方式解析的超級詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-05-05
java之lombok的構(gòu)建者模式Builder中的泛型寫法說明
這篇文章主要介紹了java之lombok的構(gòu)建者模式Builder中的泛型寫法說明,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-01-01
淺析如何在SpringBoot中實(shí)現(xiàn)數(shù)據(jù)脫敏
脫敏是指在不改變原數(shù)據(jù)結(jié)構(gòu)的前提下,通過某種方式處理數(shù)據(jù),使數(shù)據(jù)不能直接暴露用戶的真實(shí)信息,下面我們就來看看SpringBoot中實(shí)現(xiàn)數(shù)據(jù)脫敏的具體方法吧2024-03-03
Java 文件傳輸助手的實(shí)現(xiàn)(單機(jī)版)
這篇文章主要介紹了Java 文件傳輸助手的實(shí)現(xiàn)(單機(jī)版),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05

