并行Stream與Spring事務(wù)相遇會(huì)發(fā)生什么?
前言:
事情是這樣的:運(yùn)營(yíng)人員反饋,通過(guò)Excel導(dǎo)入數(shù)據(jù)時(shí),有一部分成功了,有一部分未導(dǎo)入。初步猜測(cè),是事務(wù)未生效導(dǎo)致的。查看代碼,發(fā)現(xiàn)導(dǎo)入部分已經(jīng)通過(guò)@Transcational注解進(jìn)行事務(wù)控制了,為什么還會(huì)出現(xiàn)事務(wù)不生效的問(wèn)題呢?下面我們就進(jìn)行具體的案例分析,Let's go!
事務(wù)不生效的代碼
這里寫一段簡(jiǎn)單的偽代碼來(lái)演示展示一下事務(wù)不生效的代碼:
@Transactional(rollbackFor = Exception.class)
public void batchInsert(List<Order> list) {
list.parallelStream().forEach(order -> orderMapper.save(order));
}邏輯很簡(jiǎn)單,遍歷list,然后批量插入Order數(shù)據(jù)到數(shù)據(jù)庫(kù)。在該方法上使用@Transactional來(lái)聲明出現(xiàn)異常時(shí)進(jìn)行回滾。
但事實(shí)情況是,其中某一條數(shù)據(jù)執(zhí)行異常時(shí),事務(wù)并沒(méi)有進(jìn)行回滾。這到底是為什么呢?
下面一探究竟。
JDK 8 的Stream
上面代碼中涉及到了兩個(gè)知識(shí)點(diǎn):parallelStream和@Transactional,我們先來(lái)鋪墊一下parallelStream相關(guān)知識(shí)。
在JDK8 中引入了Stream API的概念和實(shí)現(xiàn),這里的Stream有別于 InputStream 和OutputStream,Stream API 是處理對(duì)象流而不是字節(jié)流。
比如,我們可以通過(guò)如下方式來(lái)基于Stream進(jìn)行實(shí)現(xiàn):
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); numbers.stream().forEach(num->System.out.println(num));
輸出:1 2 3 4 5 6 7 8 9
代碼看起來(lái)方便清爽多了。
關(guān)于Stream的基本處理流程如下:

在這些Stream API中,還提供了一個(gè)并行處理的API,也就是parallelStream。它可以將任務(wù)拆分子任務(wù),分發(fā)給多個(gè)處理器同時(shí)處理,之后合并。這樣做的目的很明顯是為了提升處理效率。

parallelStream的基本使用方式如下:
// 并行執(zhí)行流 list.stream().parallel().filter(e -> e > 10).count()
針對(duì)上述代碼,對(duì)應(yīng)的流程如下:

而parallelStream會(huì)將流劃分成多個(gè)子流,分散到不同的CPU并行處理,然后合并處理結(jié)果。其中,parallelStream默認(rèn)是基于ForkJoinPool.commonPool()線程池來(lái)實(shí)現(xiàn)并行處理的。
通常情況下,我們可以認(rèn)為并行會(huì)比串行快,但還是有前提條件的:
- 處理器核心數(shù)量:并行處理核心數(shù)越多,處理效率越高;
- 處理數(shù)據(jù)量:處理數(shù)據(jù)量越大優(yōu)勢(shì)越明顯;
但并行處理也面臨著一系列的問(wèn)題,比如:資源競(jìng)爭(zhēng)、死鎖、線程切換、事務(wù)、可見(jiàn)性、線程安全等問(wèn)題。
@Transactional事務(wù)處理
上面了解了parallelStream的基本原理及特性之后,再來(lái)看看@Transactional的事務(wù)處理特性。
@Transactional是Spring提供的基于注解的一種聲明式事務(wù)方式,該注解只能運(yùn)用到public的方法上。
基本原理:當(dāng)一個(gè)方法被@Transactional注解之后,Spring會(huì)基于AOP在方法執(zhí)行之前開(kāi)啟一個(gè)事務(wù)。當(dāng)方法執(zhí)行完畢之后,根據(jù)方法是否報(bào)錯(cuò),來(lái)決定回滾或提交事務(wù)。
在默認(rèn)代理模式下,只有目標(biāo)方法由外部方法調(diào)用時(shí),才能被Spring的事務(wù)攔截器攔截。所以,在同一個(gè)類中的兩個(gè)方法直接調(diào)用,不會(huì)被Spring的事務(wù)攔截器攔截。這是事務(wù)不生效的一個(gè)場(chǎng)景,但在上述案例中,并不存在這種情況。
Spring在處理事務(wù)時(shí),會(huì)從連接池中獲得一個(gè)jdbc connection,將連接綁定到線程上(基于ThreadLocal),那么同一個(gè)線程中用到的就是同一個(gè)connection了。具體實(shí)現(xiàn)在DataSourceTransactionManager#doBegin方法中。
Bug綜合分析
在了解了parallelStream和@Transactional的相關(guān)知識(shí)之后,我們會(huì)發(fā)現(xiàn):parallelStream處理時(shí)開(kāi)啟了多線程,而@Transactional在處理事務(wù)時(shí)會(huì)(基于ThreadLocal)將連接綁定到當(dāng)前線程,由于@Transactional綁定管理的是主線程的事務(wù),而parallelStream開(kāi)啟的新的線程與主線程無(wú)關(guān)。因此,事務(wù)也就無(wú)效了。
此時(shí),將parallelStream改為普通的stream,事務(wù)可正?;貪L。這就提示我們,在使用基于@Transactional方式管理事務(wù)時(shí),慎重使用多線程處理。
問(wèn)題拓展
雖然parallelStream帶來(lái)了更高的性能,但也要區(qū)分場(chǎng)景進(jìn)行使用。即便是在不需要事務(wù)管理的情況下,如果parallelStream使用不當(dāng),也會(huì)造成同一時(shí)間對(duì)數(shù)據(jù)庫(kù)發(fā)起大量請(qǐng)求等問(wèn)題。
因此,在stream與parallelStream之間進(jìn)行選擇時(shí),還要考慮幾個(gè)問(wèn)題:
- 是否需要并行?數(shù)據(jù)量比較大,處理器核心數(shù)比較多的情況下才會(huì)有性能提升。
- 任務(wù)之間是否是獨(dú)立的,是否會(huì)引起任何競(jìng)態(tài)條件?比如:是否共享變量。
- 執(zhí)行結(jié)果是否取決于任務(wù)的調(diào)用順序?并行執(zhí)行的順序是不確定的。
小結(jié)
文章講述的Bug雖然簡(jiǎn)單,但如果不了解parallelStream與@Transactional注解的特性,還是很難排查的。而且也讓我們意識(shí)到,雖然Spring通過(guò)@Transactional將事務(wù)管理進(jìn)行了簡(jiǎn)化處理,但作為開(kāi)發(fā)者,還是需要深入了解一下它的基本運(yùn)作原理。不然,在排查bug時(shí),很容易踩坑。
到此這篇關(guān)于并行Stream與Spring事務(wù)相遇會(huì)發(fā)生什么?的文章就介紹到這了,更多相關(guān) Stream與Spring 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- spring-cloud-stream結(jié)合kafka使用詳解
- 解決spring 處理request.getInputStream()輸入流只能讀取一次問(wèn)題
- Spring Cloud Stream簡(jiǎn)單用法
- springboot 中 inputStream 神秘消失之謎(終破)
- Postgresql根據(jù)響應(yīng)數(shù)據(jù)反向?qū)崿F(xiàn)建表語(yǔ)句與insert語(yǔ)句的過(guò)程
- Springcloud整合stream,rabbitmq實(shí)現(xiàn)消息驅(qū)動(dòng)功能
- springMarchal集成xStream的完整示例代碼
- SpringCloud?Stream?整合RabbitMQ的基本步驟
相關(guān)文章
使用Spring MVC攔截器實(shí)現(xiàn)日志記錄的方法
本篇文章主要介紹了使用Spring MVC攔截器實(shí)現(xiàn)日志記錄的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-04-04
Javafx利用fxml變換場(chǎng)景的實(shí)現(xiàn)示例
本文主要介紹了Javafx利用fxml變換場(chǎng)景的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-07-07
Java內(nèi)存溢出的幾個(gè)區(qū)域總結(jié)(注意避坑!)
內(nèi)存溢出是指應(yīng)用系統(tǒng)中存在無(wú)法回收的內(nèi)存或使用的內(nèi)存過(guò)多,最終使得程序運(yùn)行要用到的內(nèi)存大于虛擬機(jī)能提供的最大內(nèi)存,下面這篇文章主要給大家介紹了關(guān)于Java內(nèi)存溢出的幾個(gè)區(qū)域,總結(jié)出來(lái)給大家提醒注意避坑,需要的朋友可以參考下2022-11-11
Mybatis-plus3.4.3下使用lambdaQuery報(bào)錯(cuò)解決
最近在使用lambdaQuery().eq(CommonUser::getOpenId, openId).one()進(jìn)行查詢報(bào)錯(cuò),本文主要介紹了Mybatis-plus3.4.3下使用lambdaQuery報(bào)錯(cuò)解決,具有一定的參考價(jià)值,感興趣的可以了解一下2024-07-07
java原生序列化和Kryo序列化性能實(shí)例對(duì)比分析
這篇文章主要介紹了java原生序列化和Kryo序列化性能實(shí)例對(duì)比分析,涉及Java和kryo序列化和反序列化相關(guān)實(shí)例,小編覺(jué)得很不錯(cuò),這里分享給大家,希望給大家一個(gè)參考。2017-10-10
詳解Kotlin中如何實(shí)現(xiàn)類似Java或C#中的靜態(tài)方法
Kotlin中如何實(shí)現(xiàn)類似Java或C#中的靜態(tài)方法,本文總結(jié)了幾種方法,分別是:包級(jí)函數(shù)、伴生對(duì)象、擴(kuò)展函數(shù)和對(duì)象聲明。這需要大家根據(jù)不同的情況進(jìn)行選擇。2017-05-05
springboot aspect通過(guò)@annotation進(jìn)行攔截的實(shí)例代碼詳解
這篇文章主要介紹了springboot aspect通過(guò)@annotation進(jìn)行攔截的方法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08

