淺談Java Fork/Join并行框架
初步了解Fork/Join框架
Fork/Join 框架是java7中加入的一個并行任務框架,可以將任務分割成足夠小的小任務,然后讓不同的線程來做這些分割出來的小事情,然后完成之后再進行join,將小任務的結果組裝成大任務的結果。下面的圖片展示了這種框架的工作模型:

使用Fork/Join并行框架的前提是我們的任務可以拆分成足夠小的任務,而且可以根據(jù)小任務的結果來組裝出大任務的結果,一個最簡單的例子是使用Fork/Join框架來求一個數(shù)組中的最大/最小值,這個任務就可以拆成很多小任務,大任務就是尋找一個大數(shù)組中的最大/最小值,我們可以將一個大數(shù)組拆成很多小數(shù)組,然后分別求解每個小數(shù)組中的最大/最小值,然后根據(jù)這些任務的結果組裝出最后的最大最小值,下面的代碼展示了如何通過Fork/Join求解數(shù)組的最大值:
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Created by hujian06 on 2017/9/28.
*
* fork/join demo
*/
public class ForkJoinDemo {
/**
* how to find the max number in array by Fork/Join
*/
private static class MaxNumber extends RecursiveTask<Integer> {
private int threshold = 2;
private int[] array; // the data array
private int index0 = 0;
private int index1 = 0;
public MaxNumber(int[] array, int index0, int index1) {
this.array = array;
this.index0 = index0;
this.index1 = index1;
}
@Override
protected Integer compute() {
int max = Integer.MIN_VALUE;
if ((index1 - index0) <= threshold) {
for (int i = index0;i <= index1; i ++) {
max = Math.max(max, array[i]);
}
} else {
//fork/join
int mid = index0 + (index1 - index0) / 2;
MaxNumber lMax = new MaxNumber(array, index0, mid);
MaxNumber rMax = new MaxNumber(array, mid + 1, index1);
lMax.fork();
rMax.fork();
int lm = lMax.join();
int rm = rMax.join();
max = Math.max(lm, rm);
}
return max;
}
}
public static void main(String ... args) throws ExecutionException, InterruptedException, TimeoutException {
ForkJoinPool pool = new ForkJoinPool();
int[] array = {100,400,200,90,80,300,600,10,20,-10,30,2000,1000};
MaxNumber task = new MaxNumber(array, 0, array.length - 1);
Future<Integer> future = pool.submit(task);
System.out.println("Result:" + future.get(1, TimeUnit.SECONDS));
}
}
可以通過設置不同的閾值來拆分成小任務,閾值越小代表拆出來的小任務越多。
工作竊取算法
Fork/Join在實現(xiàn)上,大任務拆分出來的小任務會被分發(fā)到不同的隊列里面,每一個隊列都會用一個線程來消費,這是為了獲取任務時的多線程競爭,但是某些線程會提前消費完自己的隊列。而有些線程沒有及時消費完隊列,這個時候,完成了任務的線程就會去竊取那些沒有消費完成的線程的任務隊列,為了減少線程競爭,F(xiàn)ork/Join使用雙端隊列來存取小任務,分配給這個隊列的線程會一直從頭取得一個任務然后執(zhí)行,而竊取線程總是從隊列的尾端拉取task。
Frok/Join框架的實現(xiàn)細節(jié)
在上面的示例代碼中,我們發(fā)現(xiàn)Fork/Join的任務是通過ForkJoinPool來執(zhí)行的,所以框架的一個核心是任務的fork和join,然后就是這個ForkJoinPool。關于任務的fork和join,我們可以想象,而且也是由我們的代碼自己控制的,所以要分析Fork/Join,那么ForkJoinPool最值得研究。

上面的圖片展示了ForkJoinPool的類關系圖,可以看到本質(zhì)上它就是一個Executor。在ForkJoinPool里面,有兩個特別重要的成員如下:
volatile WorkQueue[] workQueues; final ForkJoinWorkerThreadFactory factory;
workQueues 用于保存向ForkJoinPool提交的任務,而具體的執(zhí)行有ForkJoinWorkerThread執(zhí)行,而ForkJoinWorkerThreadFactory可以用于生產(chǎn)出ForkJoinWorkerThread??梢钥匆恍〧orkJoinWorkerThread,可以發(fā)現(xiàn)每一個ForkJoinWorkerThread會有一個pool和一個workQueue,和我們上面描述的是一致的,每個線程都被分配了一個任務隊列,而執(zhí)行這個任務隊列的線程由pool提供。
下面我們看一下當我們fork的時候發(fā)生了什么:
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
看上面的fork代碼,可以看到首先取到了當前線程,然后判斷是否是我們的ForkJoinPool專用線程,如果是,則強制類型轉(zhuǎn)換(向下轉(zhuǎn)換)成ForkJoinWorkerThread,然后將任務push到這個線程負責的隊列里面去。如果當前線程不是ForkJoinWorkerThread類型的線程,那么就會走else之后的邏輯,大概的意思是首先嘗試將任務提交給當前線程,如果不成功,則使用例外的處理方法,關于底層實現(xiàn)較為復雜,和我們使用Fork/Join關系也不太大,如果希望搞明白具體原理,可以看源碼。
下面看一下join的流程:
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}
上面展示了主要的調(diào)用鏈路,我們發(fā)現(xiàn)最后落到了我們在代碼里編寫的compute方法,也就是執(zhí)行它,所以,我們需要知道的一點是,fork僅僅是分割任務,只有當我們執(zhí)行join的時候,我們的額任務才會被執(zhí)行。
如何使用Fork/Join并行框架
前文首先展示了一個求數(shù)組中最大值得例子,然后介紹了“工作竊取算法”,然后分析了Fork/Join框架的一些細節(jié),下面才是我們最關心的,怎么使用Fork/Join框架呢?
為了使用Fork/Join框架,我們只需要繼承類RecursiveTask或者RecursiveAction。前者適用于有返回值的場景,而后者適合于沒有返回值的場景。
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
帶你了解Java數(shù)據(jù)結構和算法之無權無向圖
這篇文章主要為大家介紹了Java數(shù)據(jù)結構和算法之無權無向圖?,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-01-01
Java操作SSH2實現(xiàn)遠程執(zhí)行l(wèi)inux命令
這篇文章主要為大家詳細介紹了Java如何操作SSH2實現(xiàn)遠程執(zhí)行l(wèi)inux命令,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下2025-01-01
mybatis實現(xiàn)讀取樹結構數(shù)據(jù)實例代碼
這篇文章主要介紹了mybatis實現(xiàn)讀取樹結構數(shù)據(jù)實例代碼的相關資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2016-09-09
spring security中的csrf防御原理(跨域請求偽造)
這篇文章主要介紹了spring security中的csrf防御機制原理解析(跨域請求偽造),本文通過實例代碼詳解的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-12-12
SpringBoot集成Hutool防止XSS攻擊的兩種解決方法
XSS漏洞是生產(chǎn)上比較常見的問題,本文主要介紹了SpringBoot集成Hutool防止XSS攻擊的兩種解決方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2024-04-04
詳解Jenkins 實現(xiàn)Gitlab事件自動觸發(fā)Jenkins構建及釘釘消息推送
這篇文章主要介紹了Jenkins 實現(xiàn)Gitlab事件自動觸發(fā)Jenkins構建及釘釘消息推送,應該會對大家學習Jenkins有所啟發(fā)2021-04-04

