Java多線程同步器代碼詳解
同步器
為每種特定的同步問(wèn)題提供了解決方案,同步器是一些使線程能夠等待另一個(gè)線程的對(duì)象,允許它們協(xié)調(diào)動(dòng)作。最常用的同步器是CountDownLatch和Semaphore,不常用的是Barrier 和Exchanger
Semaphore
Semaphore【信號(hào)標(biāo);旗語(yǔ)】,通過(guò)計(jì)數(shù)器控制對(duì)共享資源的訪問(wèn)。
測(cè)試類:
package concurrent;
import concurrent.thread.SemaphoreThread;
import java.util.concurrent.Semaphore;
/**
* 拿客
* www.coderknock.com
* QQ群:213732117
* 創(chuàng)建時(shí)間:2016年08月08日
* 描述:
*/
public class SemaphoreTest {
public static void main(String[] args) {
//在Thread里聲明并不是同一個(gè)對(duì)象
Semaphore semaphore = new Semaphore(3);
SemaphoreThread testA = new SemaphoreThread("A", semaphore);
SemaphoreThread testB = new SemaphoreThread("B", semaphore);
SemaphoreThread testC = new SemaphoreThread("C", semaphore);
SemaphoreThread testD = new SemaphoreThread("D", semaphore);
SemaphoreThread testE = new SemaphoreThread("E", semaphore);
SemaphoreThread testF = new SemaphoreThread("F", semaphore);
SemaphoreThread testG = new SemaphoreThread("G", semaphore);
testA.start();
testB.start();
testC.start();
testD.start();
testE.start();
testF.start();
testG.start();
}
}
線程寫法:
package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Semaphore;
/**
* 拿客
* www.coderknock.com
* QQ群:213732117
* 創(chuàng)建時(shí)間:2016年08月08日
* 描述:
*/
public class SemaphoreThread extends Thread {
private static final Logger logger = LogManager.getLogger(SemaphoreThread.class);
//創(chuàng)建有3個(gè)信號(hào)量的信號(hào)量計(jì)數(shù)器
public Semaphore semaphore;
public SemaphoreThread(String name, Semaphore semaphore) {
setName(name);
this.semaphore = semaphore;
}
@Override
public void run() {
try {
logger.debug(getName() + " 取號(hào)等待... " + System.currentTimeMillis());
//取出一個(gè)信號(hào)
semaphore.acquire();
logger.debug(getName() + " 提供服務(wù)... " + System.currentTimeMillis());
sleep(1000);
logger.debug(getName() + " 完成服務(wù)... " + System.currentTimeMillis());
}
catch (InterruptedException e) {
e.printStackTrace();
}
logger.debug(getName() + " 釋放... " + System.currentTimeMillis());
//釋放一個(gè)信號(hào)
semaphore.release();
}
}
執(zhí)行結(jié)果【以下所有輸出結(jié)果中[]中為線程名稱- 后為輸出的內(nèi)容】:
[C] - C 取號(hào)等待... 1470642024037 [F] - F 取號(hào)等待... 1470642024036 [E] - E 取號(hào)等待... 1470642024036 [B] - B 取號(hào)等待... 1470642024037 [D] - D 取號(hào)等待... 1470642024037 [A] - A 取號(hào)等待... 1470642023965 [D] - D 提供服務(wù)... 1470642024039 [C] - C 提供服務(wù)... 1470642024039 [G] - G 取號(hào)等待... 1470642024036 [F] - F 提供服務(wù)... 1470642024040 [D] - D 完成服務(wù)... 1470642025039 [C] - C 完成服務(wù)... 1470642025039 [D] - D 釋放... 1470642025040 [F] - F 完成服務(wù)... 1470642025040 [C] - C 釋放... 1470642025041 [B] - B 提供服務(wù)... 1470642025042 [A] - A 提供服務(wù)... 1470642025042 [F] - F 釋放... 1470642025043 [E] - E 提供服務(wù)... 1470642025043 [A] - A 完成服務(wù)... 1470642026043 [B] - B 完成服務(wù)... 1470642026043 [B] - B 釋放... 1470642026043 [A] - A 釋放... 1470642026043 [G] - G 提供服務(wù)... 1470642026044 [E] - E 完成服務(wù)... 1470642026045 [E] - E 釋放... 1470642026045 [G] - G 完成服務(wù)... 1470642027045 [G] - G 釋放... 1470642027046
可以看到,當(dāng)3個(gè)信號(hào)量被領(lǐng)取完之后,之后的線程會(huì)阻塞在領(lǐng)取信號(hào)的位置,當(dāng)有信號(hào)量釋放之后才會(huì)繼續(xù)執(zhí)行。
CountDownLatch
CountDownLatch【倒計(jì)時(shí)鎖】,線程中調(diào)用countDownLatch.await()使進(jìn)程進(jìn)入阻塞狀態(tài),當(dāng)達(dá)成指定次數(shù)后(通過(guò)countDownLatch.countDown())繼續(xù)執(zhí)行每個(gè)線程中剩余的內(nèi)容。
一個(gè)同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。
用給定的計(jì)數(shù) 初始化 CountDownLatch。由于調(diào)用了 countDown() 方法,所以在當(dāng)前計(jì)數(shù)到達(dá)零之前,await 方法會(huì)一直受阻塞。之后,會(huì)釋放所有等待的線程,await 的所有后續(xù)調(diào)用都將立即返回。這種現(xiàn)象只出現(xiàn)一次——計(jì)數(shù)無(wú)法被重置。如果需要重置計(jì)數(shù),請(qǐng)考慮使用 CyclicBarrier。
測(cè)試類:
package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CountDownLatch;
public class package concurrent;
import concurrent.thread.CountDownLatchThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
/**
* 拿客
* www.coderknock.com
* QQ群:213732117
* 創(chuàng)建時(shí)間:2016年08月08日
* 描述:
*/
public class CountDownLatchTest {
private static final Logger logger = LogManager.getLogger(CountDownLatchTest.class);
public static void main(String[] args) throws InterruptedException {
//設(shè)定當(dāng)達(dá)成三個(gè)計(jì)數(shù)時(shí)觸發(fā)
CountDownLatch countDownLatch = new CountDownLatch(3);
new CountDownLatchThread("A", countDownLatch).start();
new CountDownLatchThread("B", countDownLatch).start();
new CountDownLatchThread("C", countDownLatch).start();
new CountDownLatchThread("D", countDownLatch).start();
new CountDownLatchThread("E", countDownLatch).start();
for (int i = 3; i > 0; i--) {
Thread.sleep(1000);
logger.debug(i);
countDownLatch.countDown();
}
}
}
線程類:
package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchThread extends Thread {
private static final Logger logger = LogManager.getLogger(CountDownLatchThread.class);
//計(jì)數(shù)器
private CountDownLatch countDownLatch;
public CountDownLatchThread(String name, CountDownLatch countDownLatch) {
setName(name);
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
logger.debug("執(zhí)行操作...");
try {
sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
logger.debug("等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)...");
try {
//讓線程進(jìn)入阻塞狀態(tài),等待計(jì)數(shù)達(dá)成后釋放
countDownLatch.await();
logger.debug("計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行...");
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
執(zhí)行結(jié)果:
[E] - 執(zhí)行操作... [B] - 執(zhí)行操作... [A] - 執(zhí)行操作... [C] - 執(zhí)行操作... [D] - 執(zhí)行操作... [main] DEBUG concurrent.CountDownLatchTest - 3 [B] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [E] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [C] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [D] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [A] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [main] DEBUG concurrent.CountDownLatchTest - 2 [main] DEBUG concurrent.CountDownLatchTest - 1 [E] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [C] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [B] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [D] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [A] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行...
CyclicBarrier
CyclicBarrier【Cyclic周期,循環(huán)的 Barrier屏障,障礙】循環(huán)的等待阻塞的線程個(gè)數(shù)到達(dá)指定數(shù)量后使參與計(jì)數(shù)的線程繼續(xù)執(zhí)行并可執(zhí)行特定線程(使用不同構(gòu)造函數(shù)可以不設(shè)定到達(dá)后執(zhí)行),其他線程仍處于阻塞等待再一次達(dá)成指定個(gè)數(shù)。
測(cè)試類:
package concurrent;
import concurrent.thread.CyclicBarrierThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
private static final Logger logger = LogManager.getLogger(CyclicBarrierTest.class);
public static void main(String[] args) {
//可以使用CyclicBarrier(int parties)不設(shè)定到達(dá)后執(zhí)行的內(nèi)容
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
logger.debug("---計(jì)數(shù)到達(dá)后執(zhí)行的內(nèi)容----");
}
);
new CyclicBarrierThread("A", cyclicBarrier).start();
new CyclicBarrierThread("B", cyclicBarrier).start();
new CyclicBarrierThread("C", cyclicBarrier).start();
new CyclicBarrierThread("D", cyclicBarrier).start();
new CyclicBarrierThread("E", cyclicBarrier).start();
new CyclicBarrierThread("A2", cyclicBarrier).start();
new CyclicBarrierThread("B2", cyclicBarrier).start();
new CyclicBarrierThread("C2", cyclicBarrier).start();
new CyclicBarrierThread("D2", cyclicBarrier).start();
new CyclicBarrierThread("E2", cyclicBarrier).start();
//需要注意的是,如果線程數(shù)不是上面設(shè)置的等待數(shù)量的整數(shù)倍,比如這個(gè)程序中又加了個(gè)線程,
// 那么當(dāng)達(dá)到5個(gè)數(shù)量時(shí),只會(huì)執(zhí)行達(dá)到時(shí)的五個(gè)線程的內(nèi)容,
// 剩余一個(gè)線程會(huì)出于阻塞狀態(tài)導(dǎo)致主線程無(wú)法退出,程序無(wú)法結(jié)束
// new CyclicBarrierThread("F", cyclicBarrier).start();//將這行注釋去掉程序無(wú)法自動(dòng)結(jié)束
}
}
線程類:
package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierThread extends Thread {
private static final Logger logger = LogManager.getLogger(CyclicBarrierThread.class);
private CyclicBarrier cyclicBarrier;
public CyclicBarrierThread(String name, CyclicBarrier cyclicBarrier) {
super(name);
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
logger.debug("執(zhí)行操作...");
try {
int time = new Random().nextint(10) * 1000;
logger.debug("休眠" + time/1000 + "秒");
sleep(time);
}
catch (InterruptedException e) {
e.printStackTrace();
}
logger.debug("等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)...");
try {
//讓線程進(jìn)入阻塞狀態(tài),等待計(jì)數(shù)達(dá)成后釋放
cyclicBarrier.await();
logger.debug("計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行...");
}
catch (InterruptedException e) {
e.printStackTrace();
}
catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
執(zhí)行結(jié)果:
[A] - 執(zhí)行操作... [A] - 休眠0秒 [E2] - 執(zhí)行操作... [E2] - 休眠5秒 [D2] - 執(zhí)行操作... [D2] - 休眠4秒 [C2] - 執(zhí)行操作... [C2] - 休眠4秒 [B2] - 執(zhí)行操作... [B2] - 休眠6秒 [A2] - 執(zhí)行操作... [A2] - 休眠8秒 [E] - 執(zhí)行操作... [E] - 休眠5秒 [D] - 執(zhí)行操作... [D] - 休眠0秒 [C] - 執(zhí)行操作... [C] - 休眠3秒 [B] - 執(zhí)行操作... [B] - 休眠7秒 [A] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [D] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [C] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [D2] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [C2] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [C2] DEBUG concurrent.CyclicBarrierTest - ---計(jì)數(shù)到達(dá)后執(zhí)行的內(nèi)容---- [C2] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [A] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [C] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [D2] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [D] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [E2] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [E] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [B2] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [B] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [A2] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [A2] DEBUG concurrent.CyclicBarrierTest - ---計(jì)數(shù)到達(dá)后執(zhí)行的內(nèi)容---- [E] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [B2] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [E2] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [B] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [A2] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行...
可以想象成以前不正規(guī)的長(zhǎng)途汽車站的模式:
不正規(guī)的長(zhǎng)途汽車站會(huì)等待座位坐滿之后才發(fā)車,到達(dá)目的地之后繼續(xù)等待然后循環(huán)進(jìn)行。每個(gè)人都是一個(gè)Thread,上車后觸發(fā)cyclicBarrier.await();,當(dāng)坐滿時(shí)就是達(dá)到指定達(dá)成數(shù)的時(shí)候,車輛發(fā)車就是達(dá)成后統(tǒng)一執(zhí)行的內(nèi)容,發(fā)車后車上的人們就可以聊天之類的操作了【我們暫且理解為上車后人們就都不能動(dòng)了O(∩_∩)O~】。
CountDownLatch與CyclicBarrier區(qū)別:
CountDownLatch是一個(gè)或多個(gè)線程等待計(jì)數(shù)達(dá)成后繼續(xù)執(zhí)行,await()調(diào)用并沒有參與計(jì)數(shù)。
CyclicBarrier則是N個(gè)線程等待彼此執(zhí)行到零界點(diǎn)之后再繼續(xù)執(zhí)行,await()調(diào)用的同時(shí)參與了計(jì)數(shù),并且CyclicBarrier支持條件達(dá)成后執(zhí)行某個(gè)動(dòng)作,而且這個(gè)過(guò)程是循環(huán)性的。
Exchanger
Exchanger 用于線程間進(jìn)行數(shù)據(jù)交換
可以在對(duì)中對(duì)元素進(jìn)行配對(duì)和交換的線程的同步點(diǎn)。每個(gè)線程將條目上的某個(gè)方法呈現(xiàn)給 exchange 方法,與伙伴線程進(jìn)行匹配,并且在返回時(shí)接收其伙伴的對(duì)象。Exchanger 可能被視為 SynchronousQueue 的雙向形式?! xchanger 可能在應(yīng)用程序(比如遺傳算法和管道設(shè)計(jì))中很有用。
用法示例:以下是重點(diǎn)介紹的一個(gè)類,該類使用 Exchanger 在線程間交換緩沖區(qū),因此,在需要時(shí),填充緩沖區(qū)的線程獲取一個(gè)新騰空的緩沖區(qū),并將填滿的緩沖區(qū)傳遞給騰空緩沖區(qū)的線程。 測(cè)試類:
package concurrent;
import concurrent.pojo.ExchangerPojo;
import concurrent.thread.ExchangerThread;
import java.util.HashMap;
import java.util.concurrent.Exchanger;
public class ExchangerTest {
public static void main(String[] args) {
Exchanger<HashMap<String, ExchangerPojo>> exchanger = new Exchanger<>();
new ExchangerThread("A", exchanger).start();
new ExchangerThread("B", exchanger).start();
}
}
實(shí)體類:
package concurrent.pojo;
import com.alibaba.fastjson.JSON;
import java.util.Date;
import java.util.List;
public class ExchangerPojo {
private int intVal;
private String strVal;
private List<String> strList;
private Date date;
public ExchangerPojo(int intVal, String strVal, List<String> strList, Date date) {
this.intVal = intVal;
this.strVal = strVal;
this.strList = strList;
this.date = date;
}
public int getIntVal() {
return intVal;
}
public void setIntVal(int intVal) {
this.intVal = intVal;
}
public String getStrVal() {
return strVal;
}
public void setStrVal(String strVal) {
this.strVal = strVal;
}
public List<String> getStrList() {
return strList;
}
public void setStrList(List<String> strList) {
this.strList = strList;
}
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
線程類:
package concurrent.thread;
import concurrent.pojo.ExchangerPojo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.concurrent.Exchanger;
public class ExchangerThread extends Thread {
private Exchanger<HashMap<String, ExchangerPojo>> exchanger;
private static final Logger logger = LogManager.getLogger(ExchangerThread.class);
public ExchangerThread(String name, Exchanger<HashMap<String, ExchangerPojo>> exchanger) {
super(name);
this.exchanger = exchanger;
}
@Override
public void run() {
HashMap<String, ExchangerPojo> map = new HashMap<>();
logger.debug(getName() + "提供者提供數(shù)據(jù)...");
Random random = new Random();
for (int i = 0; i < 3; i++) {
int index = random.nextint(10);
List<String> list = new ArrayList<>();
for (int j = 0; j < index; j++) {
list.add("list ---> " + j);
}
ExchangerPojo pojo = new ExchangerPojo(index, getName() + "提供的數(shù)據(jù)", list, new Date());
map.put("第" + i + "個(gè)數(shù)據(jù)", pojo);
}
try {
int time = random.nextint(10);
logger.debug(getName() + "等待" + time + "秒....");
for (int i = time; i > 0; i--) {
sleep(1000);
logger.debug(getName() + "---->" + i);
}
//等待exchange是會(huì)進(jìn)入阻塞狀態(tài),可以在一個(gè)線程中與另一線程多次交互,此處就不寫多次了
HashMap<String, ExchangerPojo> getMap = exchanger.exchange(map);
time = random.nextint(10);
logger.debug(getName() + "接受到數(shù)據(jù)等待" + time + "秒....");
for (int i = time; i > 0; i--) {
sleep(1000);
logger.debug(getName() + "---->" + i);
}
getMap.forEach((x, y) -> {
logger.debug(x + " -----> " + y.toString());
}
);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
執(zhí)行結(jié)果:
[B] - B提供者提供數(shù)據(jù)...
[A] - A提供者提供數(shù)據(jù)...
[A] - A等待2秒....
[B] - B等待0秒....
[A] - A---->2
[A] - A---->1
[B] - B接受到數(shù)據(jù)等待1秒....
[A] - A接受到數(shù)據(jù)等待4秒....
[B] - B---->1
[A] - A---->4
[B] - 第0個(gè)數(shù)據(jù) -----> {"date":1470652252049,"intVal":5,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4"],"strVal":"A提供的數(shù)據(jù)"}
[B] - 第1個(gè)數(shù)據(jù) -----> {"date":1470652252049,"intVal":1,"strList":["list ---> 0"],"strVal":"A提供的數(shù)據(jù)"}
[B] - 第2個(gè)數(shù)據(jù) -----> {"date":1470652252049,"intVal":4,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3"],"strVal":"A提供的數(shù)據(jù)"}
[A] - A---->3
[A] - A---->2
[A] - A---->1
[A] - 第0個(gè)數(shù)據(jù) -----> {"date":1470652252057,"intVal":1,"strList":["list ---> 0"],"strVal":"B提供的數(shù)據(jù)"}
[A] - 第1個(gè)數(shù)據(jù) -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的數(shù)據(jù)"}
[A] - 第2個(gè)數(shù)據(jù) -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的數(shù)據(jù)"}
Phaser
Phaser個(gè)人感覺兼具了CountDownLatch與CyclicBarrier的功能,并提供了分階段的能力。
實(shí)現(xiàn)分階段的CyclicBarrier的功能
測(cè)試代碼:
package concurrent;
import concurrent.thread.PhaserThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Phaser;
public class PhaserTest {
private static final Logger logger = LogManager.getLogger(PhaserTest.class);
public static void main(String[] args) {
Phaser phaser = new Phaser() {
/**此方法有2個(gè)作用:
* 1、當(dāng)每一個(gè)階段執(zhí)行完畢,此方法會(huì)被自動(dòng)調(diào)用,因此,重載此方法寫入的代碼會(huì)在每個(gè)階段執(zhí)行完畢時(shí)執(zhí)行,相當(dāng)于CyclicBarrier的barrierAction。
* 2、當(dāng)此方法返回true時(shí),意味著Phaser被終止,因此可以巧妙的設(shè)置此方法的返回值來(lái)終止所有線程。例如:若此方法返回值為 phase>=3,其含義為當(dāng)整個(gè)線程執(zhí)行了4個(gè)階段后,程序終止。
* */
@Override
protected Boolean onAdvance(int phase, int registeredParties) {
logger.debug("階段--->" + phase);
logger.debug("注冊(cè)的線程數(shù)量--->" + registeredParties);
return super.onAdvance(phase, registeredParties);
}
}
;
for (int i = 3; i > 0; i--) {
new PhaserThread("第" + i + "個(gè)", phaser).start();
}
}
}
線程代碼:
package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Random;
import java.util.concurrent.Phaser;
public class PhaserThread extends Thread {
private Phaser phaser;
private static final Logger logger = LogManager.getLogger(PhaserThread.class);
public PhaserThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
//把當(dāng)前線程注冊(cè)到Phaser
this.phaser.register();
logger.debug("name為" + name + "的線程注冊(cè)了" + this.phaser.getRegisteredParties() + "個(gè)線程");
}
@Override
public void run() {
logger.debug("進(jìn)入...");
phaser.arrive();
for (int i = 6; i > 0; i--) {
int time = new Random().nextint(5);
try {
logger.debug("睡眠" + time + "秒");
sleep(time * 1000);
if (i == 1) {
logger.debug("未完成的線程數(shù)量:" + phaser.getUnarrivedParties());
logger.debug("最后一次觸發(fā),并注銷自身");
phaser.arriveAndDeregister();
logger.debug("未完成的線程數(shù)量:" + phaser.getUnarrivedParties());
} else {
logger.debug("未完成的線程數(shù)量:" + phaser.getUnarrivedParties());
logger.debug(i + "--->觸發(fā)并阻塞...");
phaser.arriveAndAwaitAdvance();
//相當(dāng)于CyclicBarrier.await();
logger.debug("未完成的線程數(shù)量:" + phaser.getUnarrivedParties());
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.debug("注銷完成之后注冊(cè)的線程數(shù)量--->" + phaser.getRegisteredParties());
}
}
執(zhí)行結(jié)果:
[main] - name為第3個(gè)的線程注冊(cè)了1個(gè)線程 [main] - name為第2個(gè)的線程注冊(cè)了2個(gè)線程 [main] - name為第1個(gè)的線程注冊(cè)了3個(gè)線程 [第3個(gè)] - 進(jìn)入... [第2個(gè)] - 進(jìn)入... [第3個(gè)] - 睡眠2秒 [第2個(gè)] - 睡眠1秒 [第1個(gè)] - 進(jìn)入... [第1個(gè)] - 階段--->0 [第1個(gè)] - 注冊(cè)的線程數(shù)量--->3 [第1個(gè)] - 睡眠4秒 [第2個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 6--->觸發(fā)并阻塞... [第3個(gè)] - 未完成的線程數(shù)量:2 [第3個(gè)] - 6--->觸發(fā)并阻塞... [第1個(gè)] - 未完成的線程數(shù)量:1 [第1個(gè)] - 6--->觸發(fā)并阻塞... [第1個(gè)] - 階段--->1 [第1個(gè)] - 注冊(cè)的線程數(shù)量--->3 [第1個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 睡眠1秒 [第3個(gè)] - 睡眠0秒 [第2個(gè)] - 睡眠4秒 [第3個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 5--->觸發(fā)并阻塞... [第1個(gè)] - 未完成的線程數(shù)量:2 [第1個(gè)] - 5--->觸發(fā)并阻塞... [第2個(gè)] - 未完成的線程數(shù)量:1 [第2個(gè)] - 5--->觸發(fā)并阻塞... [第2個(gè)] - 階段--->2 [第2個(gè)] - 注冊(cè)的線程數(shù)量--->3 [第2個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 睡眠0秒 [第3個(gè)] - 睡眠2秒 [第2個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 睡眠2秒 [第2個(gè)] - 4--->觸發(fā)并阻塞... [第3個(gè)] - 未完成的線程數(shù)量:2 [第1個(gè)] - 未完成的線程數(shù)量:2 [第3個(gè)] - 4--->觸發(fā)并阻塞... [第1個(gè)] - 4--->觸發(fā)并阻塞... [第1個(gè)] - 階段--->3 [第1個(gè)] - 注冊(cè)的線程數(shù)量--->3 [第1個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 睡眠2秒 [第3個(gè)] - 睡眠1秒 [第2個(gè)] - 睡眠4秒 [第3個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 3--->觸發(fā)并阻塞... [第1個(gè)] - 未完成的線程數(shù)量:2 [第1個(gè)] - 3--->觸發(fā)并阻塞... [第2個(gè)] - 未完成的線程數(shù)量:1 [第2個(gè)] - 3--->觸發(fā)并阻塞... [第2個(gè)] - 階段--->4 [第2個(gè)] - 注冊(cè)的線程數(shù)量--->3 [第2個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 睡眠2秒 [第1個(gè)] - 睡眠2秒 [第3個(gè)] - 睡眠4秒 [第2個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 2--->觸發(fā)并阻塞... [第1個(gè)] - 2--->觸發(fā)并阻塞... [第3個(gè)] - 未完成的線程數(shù)量:1 [第3個(gè)] - 2--->觸發(fā)并阻塞... [第3個(gè)] - 階段--->5 [第3個(gè)] - 注冊(cè)的線程數(shù)量--->3 [第3個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 睡眠2秒 [第1個(gè)] - 睡眠3秒 [第2個(gè)] - 睡眠0秒 [第2個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 最后一次觸發(fā),并注銷自身 [第2個(gè)] - 未完成的線程數(shù)量:2 [第2個(gè)] - 注銷完成之后注冊(cè)的線程數(shù)量--->2 [第3個(gè)] - 未完成的線程數(shù)量:2 [第3個(gè)] - 最后一次觸發(fā),并注銷自身 [第3個(gè)] - 未完成的線程數(shù)量:1 [第3個(gè)] - 注銷完成之后注冊(cè)的線程數(shù)量--->1 [第1個(gè)] - 未完成的線程數(shù)量:1 [第1個(gè)] - 最后一次觸發(fā),并注銷自身 [第1個(gè)] - 階段--->6 [第1個(gè)] - 注冊(cè)的線程數(shù)量--->0 [第1個(gè)] - 未完成的線程數(shù)量:0 [第1個(gè)] - 注銷完成之后注冊(cè)的線程數(shù)量--->0
上面代碼中,當(dāng)所有線程進(jìn)行到arriveAndAwaitAdvance()時(shí)會(huì)觸發(fā)計(jì)數(shù)并且將線程阻塞,等計(jì)數(shù)數(shù)量等于注冊(cè)線程數(shù)量【即所有線程都執(zhí)行到了約定的地方時(shí),會(huì)放行,是所有線程得以繼續(xù)執(zhí)行,并觸發(fā)onAction事件】。我們可以在onAction中根據(jù)不同階段執(zhí)行不同內(nèi)容的操作。
實(shí)現(xiàn)分階段的CountDownLatch的功能
只需將上面的測(cè)試類更改如下:
package concurrent;
import concurrent.thread.PhaserThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Phaser;
import static jodd.util.ThreadUtil.sleep;
public class PhaserTest {
private static final Logger logger = LogManager.getLogger(PhaserTest.class);
public static void main(String[] args) {
//這里其實(shí)相當(dāng)于已經(jīng)注冊(cè)了3個(gè)線程,但是并沒有實(shí)際的線程
int coutNum=3;
Phaser phaser = new Phaser(coutNum) {
/**此方法有2個(gè)作用:
* 1、當(dāng)每一個(gè)階段執(zhí)行完畢,此方法會(huì)被自動(dòng)調(diào)用,因此,重載此方法寫入的代碼會(huì)在每個(gè)階段執(zhí)行完畢時(shí)執(zhí)行,相當(dāng)于CyclicBarrier的barrierAction。
* 2、當(dāng)此方法返回true時(shí),意味著Phaser被終止,因此可以巧妙的設(shè)置此方法的返回值來(lái)終止所有線程。例如:若此方法返回值為 phase>=3,其含義為當(dāng)整個(gè)線程執(zhí)行了4個(gè)階段后,程序終止。
* */
@Override
protected Boolean onAdvance(int phase, int registeredParties) {
logger.debug("階段--->" + phase);
logger.debug("注冊(cè)的線程數(shù)量--->" + registeredParties);
return registeredParties==coutNum;
//當(dāng)后只剩下coutNum個(gè)線程時(shí)說(shuō)明所有真實(shí)的注冊(cè)的線程已經(jīng)運(yùn)行完成,測(cè)試可以終止Phaser
}
}
;
for (int i = 3; i > 0; i--) {
new PhaserThread("第" + i + "個(gè)", phaser).start();
}
//當(dāng)phaser未終止時(shí)循環(huán)注冊(cè)這塊兒可以使用實(shí)際的業(yè)務(wù)處理
while (!phaser.isTerminated()) {
sleep(1000);
logger.debug("觸發(fā)一次");
phaser.arrive();
//相當(dāng)于countDownLatch.countDown();
}
}
}
總結(jié)
以上就是本文關(guān)于Java多線程同步器代碼詳解的全部?jī)?nèi)容,希望對(duì)大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站:
如有不足之處,歡迎留言指出。
- Java commons io包實(shí)現(xiàn)多線程同步圖片下載入門教程
- Java實(shí)現(xiàn)多線程同步五種方法詳解
- 淺析Java多線程同步synchronized
- 以銀行取錢為例模擬Java多線程同步問(wèn)題完整代碼
- java多線程之線程同步七種方式代碼示例
- Java多線程 線程同步與死鎖
- Java中多線程同步類 CountDownLatch
- Java中CountDownLatch進(jìn)行多線程同步詳解及實(shí)例代碼
- Java多線程編程中synchronized線程同步的教程
- 詳解Java多線程編程中的線程同步方法
- 五種Java多線程同步的方法
- Java多線程之線程同步
相關(guān)文章
spring-boot整合dubbo:Spring-boot-dubbo-starter
這篇文章主要介紹了spring-boot整合dubbo:Spring-boot-dubbo-starter的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2017-05-05
使用java自帶des加密算法實(shí)現(xiàn)文件加密和字符串加密
這篇文章主要介紹了使用java自帶des加密算法實(shí)現(xiàn)文件加密和字符串加密的示例,需要的朋友可以參考下2014-03-03
Jenkins配置自動(dòng)發(fā)送郵件過(guò)程圖解
這篇文章主要介紹了jenkins配置自動(dòng)發(fā)送郵件過(guò)程圖解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02
如何解決Maven出現(xiàn)Could not find artifact的問(wèn)題
這篇文章主要介紹了如何解決Maven出現(xiàn)Could not find artifact的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-04-04
詳解解決IDEA2020.1版本的lombok插件問(wèn)題
這篇文章主要介紹了詳解解決IDEA2020.1版本的lombok插件問(wèn)題。文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
Java?Web項(xiàng)目中如何添加Tomcat的Servlet-api.jar包(基于IDEA)
servlet-api.jar是在編寫servlet必須用到的jar包下面這篇文章主要給大家介紹了基于IDEAJava?Web項(xiàng)目中如何添加Tomcat的Servlet-api.jar包的相關(guān)資料,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2024-04-04
Java中Bigdecimal類的toString()方法和toPlainString()方法區(qū)別
BigDecimal類有多個(gè)方法可以將其轉(zhuǎn)換為字符串,其中包括toString()和toPlainString(),本文主要介紹了Java中Bigdecimal類的toString()方法和toPlainString()方法區(qū)別,具有一定的參考價(jià)值,感興趣的可以了解一下2024-07-07
Spring用代碼來(lái)讀取properties文件實(shí)例解析
這篇文章主要介紹了Spring用代碼來(lái)讀取properties文件實(shí)例解析,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01

