java并行流處理具體方案講解
當(dāng)我們需要調(diào)用多次調(diào)用第三方接口,我們采用并行流處理的方式發(fā)送請(qǐng)求。
實(shí)際場(chǎng)景:第三方給了一個(gè)表,我需要對(duì)遍歷整個(gè)表,調(diào)用表中的所有地址并解析json獲取數(shù)據(jù)
常規(guī)思維:如果使用for循環(huán)遍歷,一次發(fā)送http請(qǐng)求,所有請(qǐng)求都是串行,如果一個(gè)請(qǐng)求需要1秒,25個(gè)請(qǐng)求就需要25秒。
解法方案:思路 :異步請(qǐng)求,多線程發(fā)送http。
具體方案:
方案一:自定義線程池,定義請(qǐng)求方法,使用for循環(huán)講請(qǐng)求添加到線程詞。
方案二(推薦):異步請(qǐng)求注解,使用springboot的@Async注解,完美解決,編碼量少。
方案三:并行流(Parallel Stream)。
以下是方案三詳解:
并行流(Parallel Stream)深度解析
一、并行流是什么?
并行流是Java 8引入的一個(gè)特性,它允許你將流操作并行執(zhí)行,利用多核處理器的優(yōu)勢(shì)來(lái)加速處理。
java
// 順序流 vs 并行流
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 1. 順序流(默認(rèn))
int sumSequential = numbers.stream()
.reduce(0, Integer::sum);
// 2. 并行流
int sumParallel = numbers.parallelStream()
.reduce(0, Integer::sum);
// 3. 將順序流轉(zhuǎn)為并行流
int sumParallel2 = numbers.stream()
.parallel()
.reduce(0, Integer::sum);二、并行流的核心原理
1. 底層框架:Fork/Join框架
java
// 并行流背后是這個(gè)框架
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println("并行度: " + commonPool.getParallelism()); // 通常是CPU核心數(shù)-12. 工作竊取算法(Work-Stealing)
text
線程A:│任務(wù)1│任務(wù)2│任務(wù)3│任務(wù)4│
線程B:│任務(wù)5│任務(wù)6│ │ ← 空閑
↓
線程B從線程A的隊(duì)尾"偷取"任務(wù)3來(lái)執(zhí)行3. 數(shù)據(jù)拆分策略
java
// 并行流自動(dòng)將數(shù)據(jù)拆分為多個(gè)子任務(wù) Spliterator<T> spliterator = list.spliterator(); // 查看分割特性 int characteristics = spliterator.characteristics(); // SIZED: 知道確切大小 // SUBSIZED: 分割后的大小也確切 // ORDERED: 有順序要求
三、基本使用模式
1. 創(chuàng)建并行流
java
// 方式1:從集合創(chuàng)建
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> parallelStream = list.parallelStream();
// 方式2:將順序流轉(zhuǎn)為并行流
Stream<String> parallelStream2 = list.stream().parallel();
// 方式3:從數(shù)組創(chuàng)建
String[] array = {"a", "b", "c"};
Stream<String> parallelStream3 = Arrays.stream(array).parallel();2. 常用操作示例
映射操作(map)
java
List<String> words = Arrays.asList("hello", "world", "java", "stream");
// 順序處理
List<String> upperCaseSequential = words.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
// 并行處理
List<String> upperCaseParallel = words.parallelStream()
.map(String::toUpperCase)
.collect(Collectors.toList());過(guò)濾操作(filter)
java
List<Integer> numbers = IntStream.range(1, 1000000).boxed().collect(Collectors.toList());
// 并行過(guò)濾偶數(shù)
List<Integer> evenNumbers = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.collect(Collectors.toList());歸約操作(reduce)
java
// 計(jì)算1到1000000的和(并行版本更快)
long sum = LongStream.rangeClosed(1, 1000000)
.parallel()
.reduce(0, Long::sum);四、你的代碼中并行流的應(yīng)用
java
private List<Map<String, String>> getInterlockResultWen13or24Level1PointValueParallel(
Map<String, String> map,
List<IntermediateTable> list
) {
// 1. 建立查找表(提升效率)
Map<String, String> codeToTagNumber = list.stream()
.collect(Collectors.toMap(
IntermediateTable::getInterlockWen13or24Level1Point,
IntermediateTable::getTagNumber,
(v1, v2) -> v1 // 如果有重復(fù)key,取第一個(gè)
));
// 2. 使用并行流處理
return map.entrySet()
.parallelStream() // ← 關(guān)鍵!開(kāi)啟并行
.map(entry -> {
String code = entry.getKey();
String name = entry.getValue();
String tagNumber = codeToTagNumber.get(code);
// 3. 執(zhí)行IO操作(HTTP請(qǐng)求)
String value = tagNumber != null ? getSupOSValue(tagNumber) : "0";
// 4. 構(gòu)建結(jié)果
Map<String, String> resMap = new LinkedHashMap<>();
resMap.put("name", name);
resMap.put("value", value);
return resMap;
})
.collect(Collectors.toList()); // 收集結(jié)果
}五、性能對(duì)比:串行 vs 并行
java
public class ParallelStreamBenchmark {
public static void main(String[] args) {
// 模擬24次HTTP請(qǐng)求(用睡眠模擬)
List<Integer> requestIds = IntStream.range(1, 25).boxed().collect(Collectors.toList());
// 串行執(zhí)行
long start = System.currentTimeMillis();
List<String> sequentialResults = requestIds.stream()
.map(id -> mockHttpRequest(id))
.collect(Collectors.toList());
long sequentialTime = System.currentTimeMillis() - start;
// 并行執(zhí)行
start = System.currentTimeMillis();
List<String> parallelResults = requestIds.parallelStream()
.map(id -> mockHttpRequest(id))
.collect(Collectors.toList());
long parallelTime = System.currentTimeMillis() - start;
System.out.println("串行耗時(shí): " + sequentialTime + "ms");
System.out.println("并行耗時(shí): " + parallelTime + "ms");
System.out.println("加速比: " + (double)sequentialTime/parallelTime);
}
private static String mockHttpRequest(Integer id) {
try {
// 模擬HTTP請(qǐng)求延遲(100-500ms隨機(jī))
Thread.sleep(100 + (long)(Math.random() * 400));
return "Response_" + id;
} catch (InterruptedException e) {
return "Error";
}
}
}六、注意事項(xiàng)和最佳實(shí)踐
1. 什么時(shí)候使用并行流?
java
// ? 適合的場(chǎng)景 - 數(shù)據(jù)量大(>1000個(gè)元素) - 每個(gè)元素的處理是獨(dú)立的 - 處理操作是CPU密集型或IO密集型 - 沒(méi)有順序要求(或者可以使用unordered()放棄順序) // ? 不適合的場(chǎng)景 - 數(shù)據(jù)量小(<100個(gè)元素) - 處理操作簡(jiǎn)單(如簡(jiǎn)單的數(shù)學(xué)運(yùn)算) - 需要嚴(yán)格保持順序 - 操作有共享狀態(tài)(線程不安全)
2. 線程安全問(wèn)題
java
// ? 錯(cuò)誤示例:共享可變狀態(tài)
List<Integer> sharedList = new ArrayList<>();
numbers.parallelStream()
.forEach(sharedList::add); // 并發(fā)修改異常!
// ? 正確做法:使用線程安全收集器
List<Integer> safeList = numbers.parallelStream()
.collect(Collectors.toList());
// ? 或者使用同步集合
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
numbers.parallelStream()
.forEach(synchronizedList::add);3. 性能優(yōu)化技巧
java
// 1. 使用基礎(chǔ)類(lèi)型流避免裝箱拆箱
IntStream.range(1, 1000000)
.parallel()
.sum();
// 2. 使用unordered()放棄順序約束(可能提升性能)
list.parallelStream()
.unordered()
.filter(...)
.collect(Collectors.toList());
// 3. 合并操作(combiner)必須是可結(jié)合的
// reduce操作需要:a ⊕ b ⊕ c = a ⊕ (b ⊕ c)4. 控制并行度
java
// 方法1:修改系統(tǒng)屬性
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
// 方法2:使用自定義ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(4);
List<String> results = customPool.submit(() ->
list.parallelStream()
.map(item -> process(item))
.collect(Collectors.toList())
).get();七、在你的具體場(chǎng)景中的應(yīng)用建議
java
private List<Map<String, String>> getInterlockResultWen13or24Level1PointValueOptimized(
Map<String, String> map,
List<IntermediateTable> list
) {
// 1. 建立索引
Map<String, String> codeToTag = list.stream()
.collect(Collectors.toMap(
IntermediateTable::getInterlockWen13or24Level1Point,
IntermediateTable::getTagNumber,
(v1, v2) -> v1
));
// 2. 預(yù)計(jì)算需要請(qǐng)求的tagNumbers
List<String> tagNumbers = map.keySet().stream()
.map(codeToTag::get)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
// 3. 并行獲取所有值
Map<String, String> valueCache = tagNumbers.parallelStream()
.collect(Collectors.toMap(
tag -> tag,
this::getSupOSValue // 這是你的HTTP請(qǐng)求方法
));
// 4. 構(gòu)建結(jié)果
return map.entrySet().stream()
.map(entry -> {
String code = entry.getKey();
String tag = codeToTag.get(code);
String value = tag != null ? valueCache.getOrDefault(tag, "0") : "0";
Map<String, String> result = new LinkedHashMap<>();
result.put("name", entry.getValue());
result.put("value", value);
return result;
})
.collect(Collectors.toList());
}八、調(diào)試和監(jiān)控
java
// 1. 查看并行流使用的線程
List<String> results = list.parallelStream()
.peek(item -> System.out.println(Thread.currentThread().getName()))
.map(...)
.collect(Collectors.toList());
// 2. 監(jiān)控執(zhí)行時(shí)間
long start = System.nanoTime();
List<String> results = list.parallelStream()
.map(...)
.collect(Collectors.toList());
long duration = System.nanoTime() - start;
// 3. 使用JMH進(jìn)行基準(zhǔn)測(cè)試(推薦)
// 添加依賴(lài):org.openjdk.jmh九、常見(jiàn)陷阱
java
// 陷阱1:副作用操作
List<String> list = Arrays.asList("a", "b", "c");
StringBuilder builder = new StringBuilder();
list.parallelStream().forEach(builder::append); // ? 線程不安全
// 陷阱2:有狀態(tài)lambda
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> results = new ArrayList<>();
numbers.parallelStream()
.filter(n -> {
results.add(n); // ? 在lambda中修改外部狀態(tài)
return n % 2 == 0;
})
.collect(Collectors.toList());
// 陷阱3:順序依賴(lài)
Optional<Integer> first = numbers.parallelStream()
.filter(n -> n > 3)
.findFirst(); // ?? 并行下findFirst可能變慢
總結(jié)
到此這篇關(guān)于java并行流處理的文章就介紹到這了,更多相關(guān)java并行流處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于Java的電梯系統(tǒng)實(shí)現(xiàn)過(guò)程
這篇文章主要介紹了基于Java的電梯系統(tǒng)實(shí)現(xiàn)過(guò)程,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10
Java鏈表中添加元素的原理與實(shí)現(xiàn)方法詳解
這篇文章主要介紹了Java鏈表中添加元素的原理與實(shí)現(xiàn)方法,結(jié)合實(shí)例形式詳細(xì)分析了Java實(shí)現(xiàn)鏈表中添加元素的相關(guān)原理、操作技巧與注意事項(xiàng),需要的朋友可以參考下2020-03-03
使用java實(shí)現(xiàn)http多線程斷點(diǎn)下載文件(二)
下載工具我想沒(méi)有幾個(gè)人不會(huì)用的吧,前段時(shí)間比較無(wú)聊,花了點(diǎn)時(shí)間用java寫(xiě)了個(gè)簡(jiǎn)單的http多線程下載程序,我實(shí)現(xiàn)的這個(gè)http下載工具功能很簡(jiǎn)單,就是一個(gè)多線程以及一個(gè)斷點(diǎn)恢復(fù),當(dāng)然下載是必不可少的,需要的朋友可以參考下2012-12-12
關(guān)于Spring @Bean 相同加載順序不同結(jié)果不同的問(wèn)題記錄
本文主要探討了在Spring 5.1.3.RELEASE版本下,當(dāng)有兩個(gè)全注解類(lèi)定義相同類(lèi)型的Bean時(shí),由于加載順序不同,最終生成的Bean實(shí)例也會(huì)不同,文章通過(guò)分析ConfigurationClassPostProcessor的執(zhí)行過(guò)程,解釋了BeanDefinition的加載和覆蓋機(jī)制,感興趣的朋友一起看看吧2025-02-02
java通過(guò)Idea遠(yuǎn)程一鍵部署springboot到Docker詳解
這篇文章主要介紹了java通過(guò)Idea遠(yuǎn)程一鍵部署springboot到Docker詳解,Idea是Java開(kāi)發(fā)利器,springboot是Java生態(tài)中最流行的微服務(wù)框架,docker是時(shí)下最火的容器技術(shù),那么它們結(jié)合在一起會(huì)產(chǎn)生什么化學(xué)反應(yīng)呢?的相關(guān)資料2019-06-06
Java中常見(jiàn)的數(shù)據(jù)驗(yàn)證注解總結(jié)大全
在Java開(kāi)發(fā)中數(shù)據(jù)校驗(yàn)是確保應(yīng)用程序的數(shù)據(jù)完整性和一致性的重要步驟,這篇文章主要介紹了Java中常見(jiàn)的數(shù)據(jù)驗(yàn)證注解的相關(guān)資料,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2025-07-07
Java8優(yōu)雅的字符串拼接工具類(lèi)StringJoiner實(shí)例代碼
這篇文章主要給大家介紹了關(guān)于Java8優(yōu)雅的字符串拼接工具類(lèi)StringJoiner的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02
SpringCloud之Zuul服務(wù)網(wǎng)關(guān)詳解
這篇文章主要介紹了SpringCloud之Zuul服務(wù)網(wǎng)關(guān)詳解,服務(wù)網(wǎng)關(guān)是微服務(wù)架構(gòu)中一個(gè)不可或缺的部分,通過(guò)服務(wù)網(wǎng)關(guān)統(tǒng)一向外系統(tǒng)提供REST?API的過(guò)程中,除了具備服務(wù)路由、均衡負(fù)載功能之外,它還具備了權(quán)限控制(鑒權(quán))等功能,需要的朋友可以參考下2023-08-08
Java中的任務(wù)調(diào)度框架quartz詳細(xì)解析
這篇文章主要介紹了Java中的任務(wù)調(diào)度框架quartz詳細(xì)解析,Quartz 是一個(gè)完全由 Java 編寫(xiě)的開(kāi)源作業(yè)調(diào)度框架,為在 Java 應(yīng)用程序中進(jìn)行作業(yè)調(diào)度提供了簡(jiǎn)單卻強(qiáng)大的機(jī)制,需要的朋友可以參考下2023-11-11

