Java中使用ForkJoinPool的實現(xiàn)示例
背景
使用ForkJoinPool去分解計算密集型任務(wù)且且并行地執(zhí)行他們以獲得更好的Java應(yīng)用程序的性能。
ForkJoinPool是一個功能強(qiáng)大的Java類,用于處理計算密集型任務(wù)。它的工作原理是將任務(wù)分解為更小的子任務(wù),然后并行執(zhí)行。該線程池使用分而治之策略進(jìn)行操作,使其能夠并發(fā)執(zhí)行任務(wù),從而提高吞吐量并減少處理時間。
ForkJoinPool的一個獨特的特點是它的工作竊取算法,用它來優(yōu)化性能。當(dāng)工作線程完成其分配的任務(wù)時,它將從其他線程竊取任務(wù),確保所有線程都能高效工作,并且不會浪費計算機(jī)資源。
ForkJoinPool廣泛用于Java的并行Stream流和CompletableFutures,使開發(fā)人員能夠輕松地同時執(zhí)行任務(wù)。此外,其他JVM語言(如Kotlin和Akka)使用此框架來構(gòu)建需要高并發(fā)性和彈性的消息驅(qū)動應(yīng)用程序。
1.使用ForkJoinPool的線程池
ForkJoinPool 類存儲工作線程Worker,Worker是在機(jī)器的每個CPU核心上運行的進(jìn)程。每個進(jìn)程都存儲在deque中,deque代表雙端隊列。一旦工作線程的任務(wù)用完,它就開始從其他工作線程竊取任務(wù)。
首先,將有一個分叉任務(wù)的過程;這意味著一個大任務(wù)將被分解為可以并行執(zhí)行的較小任務(wù)。所有子任務(wù)完成后,它們將重新加入。然后ForkJoinPool類提供一個結(jié)果,如圖1所示。

當(dāng)任務(wù)被提交到 ForkJoinPool 中,該進(jìn)程將被劃分為較小的進(jìn)程,并推送到共享隊列中。
一旦調(diào)用了fork()方法,就會并行調(diào)用任務(wù),直到基本條件為true。一旦處理被分叉,join()方法確保線程相互等待,直到進(jìn)程完成。
最初,所有任務(wù)都將提交到一個主隊列,該主隊列將把任務(wù)推送到工作線程。請注意,任務(wù)是使用LIFO(后進(jìn)先出)策略插入的,該策略與堆棧數(shù)據(jù)結(jié)構(gòu)相同。
另一個重要的點是ForkJoinPool使用deques來存儲任務(wù)。這提供了使用后進(jìn)先出或先進(jìn)先出(先進(jìn)先出)的能力,這對于竊取工作的算法是必要的。

2.工作竊取算法
ForkJoinPool中的工作竊取是一種有效的算法,通過平衡池中所有可用線程的工作負(fù)載,可以有效地使用計算機(jī)資源。
當(dāng)一個線程變?yōu)榭臻e時,它將嘗試從仍忙于分配工作的其他線程中竊取任務(wù),而不是保持非活動狀態(tài)。這個過程最大限度地利用了計算資源,并確保沒有線程負(fù)擔(dān)過重,而其他線程保持空閑。工作竊取算法背后的關(guān)鍵概念是,每個線程都有自己的任務(wù)組,并按后進(jìn)先出的順序執(zhí)行。當(dāng)一個線程完成自己的任務(wù)并變?yōu)榭臻e時,它將嘗試從另一個線程的deque的末尾“竊取”任務(wù),遵循FIFO策略,與隊列數(shù)據(jù)結(jié)構(gòu)相同。這允許空閑線程拾取等待時間最長的任務(wù),從而減少總體等待時間并提高吞吐量。
在下圖中,線程2通過輪詢線程1的deque中的最后一個元素,從線程1中竊取一個任務(wù),然后執(zhí)行該任務(wù)。被盜任務(wù)通常是deque中最古老的任務(wù),這確保了工作負(fù)載在池中的所有線程之間均勻分布。

總的來說,F(xiàn)orkJoinPool的工作竊取算法是一個強(qiáng)大的功能,它可以通過確保所有可用的計算資源都得到有效利用來顯著提高并行應(yīng)用程序的性能。
3.ForkJoinPool的主要類
讓我們快速了解一下支持使用ForkJoinPool進(jìn)行處理的主要類。
- ForkJoinPool:創(chuàng)建一個線程池來使用ForkJoin框架。它的工作原理與其他線程池類似。這個類中最重要的方法是commonPool(),它創(chuàng)建ForkJoin線程池。
- 遞歸操作:這個類的主要功能是計算遞歸操作。請記住,在compute()方法中,我們不會返回值。這是因為遞歸發(fā)生在compute()方法中。
- RecursiveTask:這個類的工作原理類似于RecursiveAction,不同之處在于compute()方法將返回一個值。
4.使用遞歸操作
要使用RecursiveAction功能,我們需要繼承它并重寫compute()方法。然后,我們用想要實現(xiàn)的邏輯創(chuàng)建子任務(wù)。
在下面的代碼示例中,我們將以并行和遞歸的方式計算數(shù)組中每個數(shù)字的兩倍。我們被限制為并行計算二乘二的數(shù)組元素。
正如您所看到的,fork()方法調(diào)用compute()方法。一旦整個數(shù)組的每個元素都得到了和,遞歸調(diào)用就會停止。一旦遞歸地求和了數(shù)組的所有元素,我們就會顯示結(jié)果。
Listing 1. An example of RecursiveAction
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class ForkJoinDoubleAction {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
int[] array = {1, 5, 10, 15, 20, 25, 50};
DoubleNumber doubleNumberTask = new DoubleNumber(array, 0, array.length);
// Invokes compute method
forkJoinPool.invoke(doubleNumberTask);
System.out.println(DoubleNumber.result);
}
}
class DoubleNumber extends RecursiveAction {
final int PROCESS_THRESHOLD = 2;
int[] array;
int startIndex, endIndex;
static int result;
DoubleNumber(int[] array, int startIndex, int endIndex) {
this.array = array;
this.startIndex = startIndex;
this.endIndex = endIndex;
}
@Override
protected void compute() {
if (endIndex - startIndex <= PROCESS_THRESHOLD) {
for (int i = startIndex; i < endIndex; i++) {
result += array[i] * 2;
}
} else {
int mid = (startIndex + endIndex) / 2;
DoubleNumber leftArray = new DoubleNumber(array, startIndex, mid);
DoubleNumber rightArray = new DoubleNumber(array, mid, endIndex);
// Invokes the compute method recursively
leftArray.fork();
rightArray.fork();
// Joins results from recursive invocations
leftArray.join();
rightArray.join();
}
}
}該計算的輸出為252。
RecursiveAction需要記住的一點是,它不會返回值。通過使用分而治之的策略來提高性能,也可以打破這個過程。
這就是我們在清單1中所做的,我們不是計算每個數(shù)組元素的二重,而是通過將數(shù)組分解為多個部分來并行計算。
同樣重要的是要注意,RecursiveAction在用于可以有效分解為較小子問題的任務(wù)時最有效。因此,RecursiveAction和ForkJoinPool應(yīng)該用于計算密集型任務(wù),在這些任務(wù)中,工作的并行化可以顯著提高性能。否則,由于線程的創(chuàng)建和管理,性能將更差。
5.資源任務(wù)
在下一個示例中,讓我們研究一個簡單的程序,它遞歸地在中間中斷,直到達(dá)到基本條件。在本例中,我們使用的是RecursiveTask類。RecursiveAction和RecursiveTask的區(qū)別在于,使用遞歸任務(wù),我們可以在compute()方法中返回一個值。Listing 2. An example of RecursiveTask
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ForkJoinSumArrayTask extends RecursiveTask<Integer> {
private final List<Integer> numbers;
public ForkJoinSumArrayTask(List<Integer> numbers) {
this.numbers = numbers;
}
@Override
protected Integer compute() {
if (numbers.size() <= 2) {
return numbers.stream().mapToInt(e -> e).sum();
} else {
int mid = numbers.size() / 2;
List<Integer> list1 = numbers.subList(0, mid);
List<Integer> list2 = numbers.subList(mid, numbers.size());
ForkJoinSumArrayTask task1 = new ForkJoinSumArrayTask(list1);
ForkJoinSumArrayTask task2 = new ForkJoinSumArrayTask(list2);
task1.fork();
return task1.join() + task2.compute();
}
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
List<Integer> numbers = List.of(1, 3, 5, 7, 9);
int output = forkJoinPool.invoke(new ForkJoinSumArrayTask(numbers));
System.out.println(output);
}
}在這里,我們遞歸地分解中間的數(shù)組,直到它達(dá)到基本條件。
一旦我們破壞了主數(shù)組,我們將list1和list2發(fā)送到ForkJoinsMarrayTask,然后我們分叉task1,它并行執(zhí)行compute()方法和數(shù)組的其他部分。
一旦遞歸過程達(dá)到基本條件,就會調(diào)用連接方法,連接結(jié)果。這種情況下的輸出為25。
6.何時使用ForkJoinPool
ForkJoinPool不應(yīng)該在所有情況下都使用。如前所述,最好將其用于高度密集的并發(fā)進(jìn)程。讓我們具體看看這些情況是什么:
- 遞歸任務(wù):ForkJoinPool非常適合執(zhí)行遞歸算法,如快速排序、合并排序或二進(jìn)制搜索。這些算法可以分解成更小的子問題并并行執(zhí)行,這可以顯著提高性能。
- 令人尷尬的并行問題:如果你有一個問題可以很容易地劃分為獨立的子任務(wù),例如圖像處理或數(shù)值模擬,你可以使用ForkJoinPool并行執(zhí)行子任務(wù)。
- 高并發(fā)場景:在高并發(fā)場景中,如web服務(wù)器、數(shù)據(jù)處理管道或其他高性能應(yīng)用程序,可以使用ForkJoinPool跨多個線程并行執(zhí)行任務(wù),這有助于提高性能和吞吐量。
7.總結(jié)
在本文中,您了解了如何使用最重要的ForkJoinPool功能在單獨的CPU核心中執(zhí)行繁重的操作。讓我們以這篇文章的要點作為結(jié)論:
- ForkJoinPool是一個使用分治策略遞歸執(zhí)行任務(wù)的線程池。
- 它被諸如Kotlin和Akka之類的JVM語言用來構(gòu)建消息驅(qū)動的應(yīng)用程序。
- ForkJoinPool并行執(zhí)行任務(wù),從而實現(xiàn)計算機(jī)資源的高效使用。
- 工作竊取算法通過允許空閑線程從繁忙線程中竊取任務(wù)來優(yōu)化資源利用率。
- 任務(wù)存儲在一個雙端隊列中,LIFO策略用于存儲,F(xiàn)IFO用于竊取。
- ForkJoinPool框架中的主要類包括ForkJoinPool、RecursiveAction和RecursiveTask:
- RecursiveAction用于計算遞歸操作,不返回任何值。
- RecursiveTask類似,但返回一個值。
- compute()方法在兩個類中都被重寫以實現(xiàn)自定義邏輯。
- fork()方法調(diào)用compute()方法并將任務(wù)分解為更小的子任務(wù)。
- join()方法等待子任務(wù)完成并合并它們的結(jié)果。
- ForkJoinPool通常與并行流和CompletableFuture一起使用。
到此這篇關(guān)于Java中使用ForkJoinPool的實現(xiàn)示例的文章就介紹到這了,更多相關(guān)Java使用ForkJoinPool內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Zuul 實現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié)
這篇文章主要介紹了Zuul 實現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
Springboot中使用Filter實現(xiàn)Header認(rèn)證詳解
這篇文章主要介紹了Springboot中使用Filter實現(xiàn)Header認(rèn)證詳解,當(dāng)在?web.xml?注冊了一個?Filter?來對某個?Servlet?程序進(jìn)行攔截處理時,它可以決定是否將請求繼續(xù)傳遞給?Servlet?程序,以及對請求和響應(yīng)消息是否進(jìn)行修改,需要的朋友可以參考下2023-08-08
Java并發(fā)包線程池ThreadPoolExecutor的實現(xiàn)
本文主要介紹了Java并發(fā)包線程池ThreadPoolExecutor的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04
Springboot詳解RocketMQ實現(xiàn)消息發(fā)送與接收流程
這篇文章主要介紹了SpringBoot整合RocketMQ實現(xiàn)消息發(fā)送和接收功能,我們使用主流的SpringBoot框架整合RocketMQ來講解,使用方便快捷,本文分步驟給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-06-06
解決IDEA Maven下載依賴時報錯ERROR - #org.jetbrains.ide
這篇文章主要介紹了解決IDEA Maven下載依賴時報錯ERROR - #org.jetbrains.idea.maven - Cannot reconnect.問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08

