Java Stream 并行流簡介、使用與注意事項小結(jié)
1. 并行流簡介
Java 8 引入了 Stream API,提供了一種高效的數(shù)據(jù)處理方式。而 ?并行流(Parallel Stream)? 則是 Stream 的并行版本,能夠?qū)⒘鞑僮鞣峙涞蕉鄠€線程中執(zhí)行,充分利用多核 CPU 的性能。
?特點(diǎn):
- 默認(rèn)使用
ForkJoinPool.commonPool()執(zhí)行任務(wù)。 - 適合處理 ?計算密集型 任務(wù)。
- 任務(wù)執(zhí)行順序不確定。
?2. 并行流的簡單使用
將普通流轉(zhuǎn)換為并行流非常簡單,只需調(diào)用 parallel() 方法即可。
?示例:并行流的基本使用
import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 將流轉(zhuǎn)換為并行流
numbers.parallelStream()
.forEach(num -> System.out.println("線程: " + Thread.currentThread().getName() + ", 處理: " + num));
}
}輸出示例:
線程: main, 處理: 6
線程: ForkJoinPool.commonPool-worker-1, 處理: 3
線程: ForkJoinPool.commonPool-worker-2, 處理: 8
...
?3. 配合自定義線程池
默認(rèn)情況下,并行流使用 ForkJoinPool.commonPool() 執(zhí)行任務(wù)。你可以通過自定義線程池來控制并行流的執(zhí)行環(huán)境。
?示例:自定義線程池
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamCustomPool {
public static void main(String[] args) {
// 創(chuàng)建自定義線程池
ForkJoinPool customPool = new ForkJoinPool(4);
// 在自定義線程池中執(zhí)行并行流任務(wù)
customPool.submit(() -> {
List<Integer> result = IntStream.rangeClosed(1, 10)
.parallel()
.map(i -> {
System.out.println("線程: " + Thread.currentThread().getName() + ", 處理: " + i);
return i * 2;
})
.boxed()
.collect(Collectors.toList());
System.out.println("結(jié)果: " + result);
}).join(); // 等待任務(wù)完成
customPool.shutdown(); // 關(guān)閉線程池
}
}?示例:配合CompletableFuture實(shí)現(xiàn)異步
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamWithCompletableFuture {
public static void main(String[] args) {
// 創(chuàng)建一個并行流
List<CompletableFuture<Integer>> futures = IntStream.rangeClosed(1, 10)
.parallel()
.mapToObj(i -> CompletableFuture.supplyAsync(() -> {
System.out.println("線程: " + Thread.currentThread().getName() + ", 處理: " + i);
return i * 2; // 模擬計算任務(wù)
}))
.collect(Collectors.toList());
// 等待所有任務(wù)完成并獲取結(jié)果
List<Integer> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println("結(jié)果: " + results);
}
}好處:
- ?并行流:適合處理數(shù)據(jù)流中的計算密集型任務(wù),能夠自動將任務(wù)分配到多個線程中執(zhí)行。
- ?CompletableFuture:提供強(qiáng)大的異步編程能力,可以處理任務(wù)的依賴關(guān)系、異常處理、結(jié)果合并等。
結(jié)合兩者的優(yōu)勢,可以實(shí)現(xiàn):
- ?異步并行處理:將并行流的任務(wù)異步化,進(jìn)一步提升性能。
- ?任務(wù)依賴管理:通過
CompletableFuture管理任務(wù)之間的依賴關(guān)系。 - ?結(jié)果合并:將多個任務(wù)的結(jié)果合并處理。
?4. 控制有序性
并行流的任務(wù)執(zhí)行順序是不確定的。如果需要保持順序,可以使用 forEachOrdered() 方法。
?示例:保持順序
import java.util.Arrays;
import java.util.List;
public class ParallelStreamOrder {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers.parallelStream()
.forEachOrdered(System.out::println); // 輸出順序與流中元素順序一致
}
}?5. 共享資源的安全性
并行流在多個線程中執(zhí)行操作,如果操作共享可變狀態(tài),可能會導(dǎo)致線程安全問題。
?示例:線程安全問題
import java.util.ArrayList;
import java.util.List;
public class ParallelStreamThreadSafety {
public static void main(String[] args) {
List<Integer> result = new ArrayList<>();
IntStream.rangeClosed(1, 1000)
.parallel()
.forEach(result::add); // 這里會出現(xiàn)線程安全問題
System.out.println("結(jié)果大小: " + result.size()); // 結(jié)果可能小于 1000
}
}解決方法:
- 使用線程安全的集合,如
Collections.synchronizedList()。 - 使用
collect()方法將結(jié)果收集到線程安全的容器中。
?示例:線程安全的解決方案
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamThreadSafety {
public static void main(String[] args) {
List<Integer> result = IntStream.rangeClosed(1, 1000)
.parallel()
.boxed()
.collect(Collectors.toList()); // 使用 collect() 方法
System.out.println("結(jié)果大小: " + result.size()); // 輸出: 1000
}
}?6. 注意事項
- ?任務(wù)類型:
- 適合 ?計算密集型 任務(wù),不適合 ?I/O 密集型 任務(wù)。
- ?線程安全:
- 避免在并行流中操作共享可變狀態(tài)。
- ?任務(wù)順序:
- 并行流的任務(wù)執(zhí)行順序不確定,使用
forEachOrdered()保持順序。
- 并行流的任務(wù)執(zhí)行順序不確定,使用
- ?線程池管理:
- 使用自定義線程池時,記得關(guān)閉線程池,避免資源泄漏。
?7. 總結(jié)
并行流是 Java 8 提供的一個強(qiáng)大工具,能夠顯著提升數(shù)據(jù)處理性能。但在使用時需要注意線程安全、任務(wù)順序和線程池管理等問題。通過合理使用并行流,可以編寫高效、靈活的代碼。
?附錄:完整代碼
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamDemo {
public static void main(String[] args) {
// 基本使用
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers.parallelStream()
.forEach(num -> System.out.println("線程: " + Thread.currentThread().getName() + ", 處理: " + num));
// 自定義線程池
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() -> {
List<Integer> result = IntStream.rangeClosed(1, 10)
.parallel()
.map(i -> {
System.out.println("線程: " + Thread.currentThread().getName() + ", 處理: " + i);
return i * 2;
})
.boxed()
.collect(Collectors.toList());
System.out.println("結(jié)果: " + result);
}).join();
customPool.shutdown();
// 保持順序
numbers.parallelStream()
.forEachOrdered(System.out::println);
// 線程安全
List<Integer> safeResult = IntStream.rangeClosed(1, 1000)
.parallel()
.boxed()
.collect(Collectors.toList());
System.out.println("結(jié)果大小: " + safeResult.size());
}
}希望這篇文章能幫助你更好地理解和使用 Java 的并行流!如果有任何問題,歡迎在評論區(qū)討論!
到此這篇關(guān)于Java Stream 并行流簡介、使用與注意事項小結(jié)的文章就介紹到這了,更多相關(guān)Java Stream 并行流內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot?獲取請求參數(shù)的常用注解及用法
Spring?Boot通過@RequestParam、@PathVariable等注解支持從HTTP請求中獲取參數(shù),涵蓋查詢、路徑、請求體、頭、Cookie等類型,還可綁定到對象或使用Servlet?API,需注意參數(shù)匹配規(guī)則及配置支持隱藏方法過濾器,本文介紹SpringBoot獲取請求參數(shù)的常用注解,感興趣朋友一起看看吧2025-07-07
SpringBoot種如何使用?EasyExcel?實(shí)現(xiàn)自定義表頭導(dǎo)出并實(shí)現(xiàn)數(shù)據(jù)格式化轉(zhuǎn)換
本文詳細(xì)介紹了如何使用EasyExcel工具類實(shí)現(xiàn)自定義表頭導(dǎo)出,并實(shí)現(xiàn)數(shù)據(jù)格式化轉(zhuǎn)換與添加下拉框操作,通過示例和代碼,展示了如何處理不同數(shù)據(jù)結(jié)構(gòu)和注解,確保數(shù)據(jù)在導(dǎo)出時能夠正確顯示和格式化,此外,還介紹了如何解決特定數(shù)據(jù)類型的轉(zhuǎn)換問題,并提供了解決方案2024-11-11
Java FtpClient 實(shí)現(xiàn)文件上傳服務(wù)
本文主要對Java FtpClient實(shí)現(xiàn)簡單的圖片上傳到服務(wù)器的方法進(jìn)行介紹,并且展示的小demo中,對配置過程中主要碰到的問題:關(guān)于文件權(quán)限的問題也進(jìn)行了說明,下面跟著小編一起來看下吧2016-12-12
SpringBoot實(shí)現(xiàn)application.yml文件敏感信息加密
本文主要介紹了SpringBoot實(shí)現(xiàn)application.yml文件敏感信息加密,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07
mybatis plus 開啟sql日志打印的方法小結(jié)
Mybatis-Plus(簡稱MP)是一個 Mybatis 的增強(qiáng)工具,在 Mybatis 的基礎(chǔ)上只做增強(qiáng)不做改變,為簡化開發(fā)、提高效率而生。本文重點(diǎn)給大家介紹mybatis plus 開啟sql日志打印的方法小結(jié),感興趣的朋友一起看看吧2021-09-09
Spring Cache + Caffeine的整合與使用示例詳解
對于一些項目里需要對數(shù)據(jù)庫里的某些數(shù)據(jù)一直重復(fù)請求的,且這些數(shù)據(jù)基本是固定的,在這種情況下,可以借助簡單使用本地緩存來緩存這些數(shù)據(jù),本文介紹一下Spring Cache和Caffeine的使用,感興趣的朋友一起看看吧2023-12-12
IDEA?mybatis?Mapper.xml報紅的最新解決辦法
這篇文章主要介紹了IDEA?mybatis?Mapper.xml報紅的解決辦法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-04-04

