Java中流式并行操作parallelStream的原理和使用方法
Java中流式并行操作parallelStream
0. 問題的產(chǎn)生
某天上線后,發(fā)現(xiàn)線上存在一些報(bào)錯(cuò),遂即自己嘗試線上操作,但是發(fā)現(xiàn)功能正常。追蹤相關(guān)的報(bào)錯(cuò)代碼行如下:
PointMissionPO missionPO = missionBO.getDbData();
該行報(bào)錯(cuò)為空指針異常,可以從代碼中判斷唯一能報(bào)出空指針異常的位置為missionBO為空,向上追蹤該引用:
for (PointMissionBO missionBO : result) {
// 循環(huán)體內(nèi)容
}
為一個(gè)列表List的循環(huán)體。于是接著向前追蹤引用,發(fā)現(xiàn)所有處理該列表引用的地方均使用了stream流式操作。經(jīng)常使用函數(shù)式編程的同學(xué)都知道,這很少會(huì)出現(xiàn)null對象在列表中。我通體檢查了一遍都未發(fā)現(xiàn)任何可能產(chǎn)生null對象的插入位置。很奇怪那么這個(gè)null對象是如何被加入到列表中的呢?
后來有個(gè)小伙伴經(jīng)過AI上下文分析,給出了可能的位置:
missionByType.entrySet().parallelStream()
.filter(entry -> !CollectionUtils.isEmpty(entry.getValue()))
.map(entry -> processMissions(ctmId, hruId, entry.getValue()))
.forEach(result::addAll);仔細(xì)一看居然使用了parallelStream并行處理,那么什么是parallelStream呢?為什么它會(huì)出現(xiàn)錯(cuò)誤呢?
1. 什么是parallelStream?
Java 8引入的Stream API提供了兩種處理方式:
- stream():串行處理,按順序處理元素
- parallelStream():并行處理,利用多核CPU將數(shù)據(jù)分割成多個(gè)部分并行處理
2. parallelStream的工作原理
parallelStream基于Fork/Join框架實(shí)現(xiàn):
- 將數(shù)據(jù)源分割成多個(gè)子任務(wù)(fork)
- 在不同線程上并行處理這些子任務(wù)
- 合并結(jié)果(join)
3. parallelStream的正確與錯(cuò)誤使用示例
錯(cuò)誤使用示例(來自我們的測試代碼):
List<String> result = new ArrayList<>();
missionByType.entrySet().parallelStream()
.map(entry -> processMissions(entry.getValue()))
.forEach(result::addAll); // 危險(xiǎn)操作!
問題分析:
- ArrayList不是線程安全的集合
- 多個(gè)線程同時(shí)調(diào)用result::addAll會(huì)產(chǎn)生競態(tài)條件
- 可能導(dǎo)致數(shù)據(jù)丟失、重復(fù)、甚至程序崩潰
正確使用方式一:使用同步塊
List<String> result = new ArrayList<>();
missionByType.entrySet().parallelStream()
.map(entry -> processMissions(entry.getValue()))
.filter(processedMissions -> !processedMissions.isEmpty())
.forEach(processedMissions -> {
synchronized (result) {
result.addAll(processedMissions);
}
});
正確使用方式二:使用收集器(推薦)
List<String> safeResult = missionByType.entrySet().parallelStream()
.flatMap(entry -> processMissions(entry.getValue()).stream())
.collect(Collectors.toList());
4. parallelStream在實(shí)際業(yè)務(wù)中的應(yīng)用
查看我們項(xiàng)目中的實(shí)際應(yīng)用案例:
// PointBusinessServiceImpl.java 中的實(shí)際使用
List<PointMissionBO> result = missionByType.entrySet().parallelStream()
.filter(entry -> !CollectionUtils.isEmpty(entry.getValue()))
.flatMap(entry -> processMissions(ctmId, hruId, entry.getValue()).stream())
.collect(Collectors.toList());
這種方式的優(yōu)點(diǎn):
- 使用flatMap展平數(shù)據(jù)結(jié)構(gòu)
- 使用collect收集結(jié)果,避免線程安全問題
- 提高了任務(wù)處理效率
5. parallelStream適用場景與注意事項(xiàng)
適用場景:
- 數(shù)據(jù)量較大(通常萬級(jí)以上)
- 計(jì)算密集型操作
- 無狀態(tài)操作(函數(shù)式編程)
- 不依賴處理順序的操作
不適用場景:
- 數(shù)據(jù)量?。ú⑿虚_銷可能超過收益)
- IO密集型操作
- 有狀態(tài)共享操作
- 需要保證處理順序的場景
注意事項(xiàng):
- 線程安全:避免在并行流中使用非線程安全的對象
- 副作用:避免在流操作中修改外部狀態(tài)
- 性能考量:并行不一定比串行快,需根據(jù)實(shí)際情況評估
- 資源競爭:注意共享資源的訪問控制
6. 最佳實(shí)踐建議
- 優(yōu)先考慮collect:使用收集器而不是直接修改共享集合
- 避免副作用:確保流操作是無狀態(tài)的純函數(shù)
- 合理選擇數(shù)據(jù)結(jié)構(gòu):使用適合并行處理的數(shù)據(jù)結(jié)構(gòu)
- 測試性能:在實(shí)際環(huán)境中測試并行處理效果
- 監(jiān)控資源使用:關(guān)注CPU和內(nèi)存使用情況
以上就是關(guān)于parallelStream并行處理的AI輸出內(nèi)容。下面結(jié)合實(shí)際問題發(fā)生的場景進(jìn)行推演和論述。
7. 其他思考
上面說了這么多,核心重點(diǎn)在于:parallelStream中使用線程不安全的對象操作時(shí)會(huì)出現(xiàn)異常。那么具體是什么樣的異常呢?
其實(shí)我們仔細(xì)想想非內(nèi)存安全List的內(nèi)存管理,熟悉八股文的同學(xué)應(yīng)該一下子就懂了,沒錯(cuò)就是內(nèi)存擴(kuò)展機(jī)制。
當(dāng)一個(gè)非內(nèi)存安全的List在觸發(fā)到內(nèi)存擴(kuò)展閾值的時(shí)候,就會(huì)觸發(fā)一次內(nèi)存擴(kuò)展。具體原理就是開辟一個(gè)更長的(通常是2倍)列表,然后將原數(shù)據(jù)賦值到新列表中。這個(gè)過程中,如果出現(xiàn)了并行開辟的情況,那賦值的內(nèi)容以及目標(biāo)列表就會(huì)變得混亂,出現(xiàn)null對象也并非不可能。因此當(dāng)嘗試指定初始列表大小為256的情況下,第0章的錯(cuò)誤就自然消失了。
那么也就出現(xiàn)一些關(guān)于Collection的使用建議:
- 初始化List時(shí),最好能夠有一個(gè)預(yù)估的列表大小并指定,其實(shí)所有Collection對象都可以這么做。
- 但凡出現(xiàn)非線程安全的Collection對象需要參與并行計(jì)算時(shí),都需要注意它的數(shù)據(jù)正確性應(yīng)當(dāng)如何保證,如果無法自行控制,或者控制有難度的,可以考慮使用Concurrent包中的內(nèi)容。
到此這篇關(guān)于Java中流式并行操作parallelStream的原理和使用方法的文章就介紹到這了,更多相關(guān)java并行parallelStream內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中List轉(zhuǎn)Map的幾種常見方式與對比
JavaList轉(zhuǎn)Map是一個(gè)非常常用的技術(shù),對于Java開發(fā)人員來講,掌握該技術(shù)可以幫助我們更加高效地操作List集合中的對象,這篇文章主要給大家介紹了關(guān)于Java中List轉(zhuǎn)Map的幾種常見方式與對比的相關(guān)資料,需要的朋友可以參考下2024-02-02
線程池之newCachedThreadPool可緩存線程池的實(shí)例
這篇文章主要介紹了線程池之newCachedThreadPool可緩存線程池的實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06
深入了解Spring Boot2.3.0及以上版本的Liveness和Readiness功能
這篇文章主要介紹了Spring Boot2.3.0及以上版本的Liveness和Readiness功能示例深入解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10
IDEA無法打開Marketplace的三種解決方案(推薦)
這篇文章主要介紹了IDEA無法打開Marketplace的三種解決方案(推薦),本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11
spring boot 防止重復(fù)提交實(shí)現(xiàn)方法詳解
這篇文章主要介紹了spring boot 防止重復(fù)提交實(shí)現(xiàn)方法,結(jié)合實(shí)例形式詳細(xì)分析了spring boot 防止重復(fù)提交具體配置、實(shí)現(xiàn)方法及操作注意事項(xiàng),需要的朋友可以參考下2019-11-11
Java項(xiàng)目中如何引入Hutool工具類并正確使用它
Hutool是一個(gè)小而全的Java工具類庫,通過靜態(tài)方法封裝,降低相關(guān)API的學(xué)習(xí)成本,提高工作效率,使Java擁有函數(shù)式語言般的優(yōu)雅,這篇文章主要給大家介紹了關(guān)于Java項(xiàng)目中如何引入Hutool工具類并正確使用它的相關(guān)資料,需要的朋友可以參考下2024-01-01
SpringBoot使用thymeleaf實(shí)現(xiàn)前端表格
雖然現(xiàn)在流行前后端分離,但是后端模版在一些關(guān)鍵地方還是非常有用的,例如郵件模版、代碼模版等。當(dāng)然也不排除一些古老的項(xiàng)目后端依然使用動(dòng)態(tài)模版。Thymeleaf 簡潔漂亮、容易理解,并且完美支持 HTML5,可以直接打開靜態(tài)頁面,同時(shí)不新增標(biāo)簽,只需增強(qiáng)屬性2022-10-10
深入解析Java的Struts框架中的控制器DispatchAction
這篇文章主要介紹了深入解析Java的Struts框架中的控制器DispatchAction,Struts是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下2015-12-12

