Spring TransactionalEventListener事務(wù)未提交讀取不到數(shù)據(jù)的解決
一、背景
業(yè)務(wù)處理過程,發(fā)現(xiàn)了以下問題,代碼一是原代碼能正常執(zhí)行,代碼二是經(jīng)過迭代一次非正常執(zhí)行代碼
- 代碼一:以下代碼開啟線程后,代碼正常執(zhí)行
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
@Transactional
public Long test() {
// ......
// 插入記錄
Long studentId = studentService.insert(student);
// 異步線程
writeStatisticsData(studentId);
return studentId;
}
private void writeStatisticsData(Long studentId) {
executor.execute(() -> {
Student student = studentService.findById(studentId);
//........
});
}
- 代碼二:以下代碼開啟線程后,代碼不正常執(zhí)行
@Transactional
public Long test() {
// ......
// 插入記錄
Long studentId = studentService.insert(student);
// 異步線程
writeStatisticsData(studentId);
// 插入學(xué)生地址記錄
Long addressId = addressService.insert(address);
return studentId;
}
private void writeStatisticsData(Long studentId) {
executor.execute(() -> {
Student student = studentService.findById(studentId);
//........
});
}
二、問題分析
這里使用了spring事務(wù),顯然需要考慮事務(wù)的隔離級別
2.1、mysql隔離級別
查看mysql隔離級別
SELECT @@tx_isolation; READ-COMMITTED
讀提交,即在事務(wù)A插入數(shù)據(jù)過程中,事務(wù)B在A提交之前讀取A插入的數(shù)據(jù)讀取不到,而B在A提交之后再去讀就會讀取到A插入的數(shù)據(jù),也即Read Committed不能保證在一個事務(wù)中每次讀都能讀到相同的數(shù)據(jù),因為在每次讀數(shù)據(jù)之后其他并發(fā)事務(wù)可能會對剛才讀到的數(shù)據(jù)進行修改。
2.2、問題原因分析
- 代碼一正常運行的原因
由于mysql事務(wù)的隔離級別是讀提交,test方法在開啟異步線程后,異步線程也開啟了事務(wù),同時以讀者身份去讀 test 方法中插入的 student 記錄,但此時 test 方法已經(jīng)提交了事務(wù),所以可以讀取到 student 記錄(即在異步方法中可以讀取到 student 記錄),但此代碼有風(fēng)險,若事務(wù)提交的時間晚一點,異步線程也有可能讀取不到 student 記錄。
- 代碼二不能正常運行的原因
經(jīng)過上面分析,很明顯異步方法中不能讀取到 student 記錄,由于代碼二在異步線程下面又執(zhí)行了其他操作,延時了test方法中事務(wù)的提交,所以代碼二不能正常運行。
三、解決問題方案
解決思路是在事務(wù)提交后再做其他的處理(如異步發(fā)消息處理等),這里還是從Spring執(zhí)行事務(wù)的過程中入手,Spring事務(wù)的處理過程不再分析,這里直接看Spring事務(wù)增強器TransactionInterceptor的核心處理流程,源碼如下:
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
// 獲取事務(wù)屬性
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
//加載配置中配置的TransactionManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 聲明式事務(wù)的處理
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
//......
retVal = invocation.proceedWithInvocation();
//......
commitTransactionAfterReturning(txInfo);
return retVal;
} else {
// 編程式事務(wù)的處理......
}
//......
}
這里主要看聲明式事務(wù)的處理,因為編程式事務(wù)的處理及提交都是用戶在編碼中進行控制。在聲明式事務(wù)處理中,當方法執(zhí)行完后,會執(zhí)行 commitTransactionAfterReturning 方法來進行提交事務(wù),該方法在 TransactionAspectSupport 類中,源碼如下:
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
再看 commit 方法,該方法在 AbstractPlatformTransactionManager 類中,源碼如下:
public final void commit(TransactionStatus status) throws TransactionException {
// 這里省略很多代碼,如事務(wù)回滾......
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
// 提交事務(wù)
doCommit(status);
}
//......
} catch (......) {
// 事務(wù)異常處理......
}
try {
// 事務(wù)提交成功后的處理-----這里是重點
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
TransactionSynchronizationUtils.triggerAfterCommit();
}
}
最終會走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中
public static void triggerAfterCommit() {
invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}
public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
synchronization.afterCommit();
}
}
}
上面會把緩存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按順序來執(zhí)行 afterCommit 方法,其中 TransactionSynchronization 以集合形式緩存在 TransactionSynchronizationManager 的 ThreadLocal 中。
3.1、方式一
經(jīng)過上面分析,只需要代碼中重新生成個 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解決方案,如下:
private void writeStatisticsData(Long studentId) {
if(TransactionSynchronizationManager.isActualTransactionActive()) {
// 當前存在事務(wù)
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
executor.execute(() -> {Student student = studentService.findById(studentId);
//........
});
}});
} else {
// 當前不存在事務(wù)
executor.execute(() -> {Student student = studentService.findById(studentId);
//........
});
}
}
3.2、方式二
使用 @TransactionalEventListener 結(jié)合 Spring事件監(jiān)聽機制,該注解自從Spring4.2版本開始有的,如下:
// 事件
public class StudentEvent extends ApplicationEvent {
public StudentEvent(Long studentId) {
super(studentId);
}
}
// 監(jiān)聽器
public class StudentEventListener{
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void writeStatisticsData(StudentEvent studentEvent) {
executor.execute(() -> {
Student student = studentService.findById(studentEvent.getSource());
//........
});
}
}
@Service
public class StudentService {
// Spring4.2之后,ApplicationEventPublisher自動被注入到容器中,采用Autowired即可獲取
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Transactional
public Long test() {
// ......
// 插入記錄
Long studentId = studentService.insert(student);
// 發(fā)布事件
applicationEventPublisher.publishEvent(new StudentEvent(studentId));
// 插入學(xué)生地址記錄
Long addressId = addressService.insert(address);
return studentId;
}
}
原理分析
Spring Bean在加載配置文件時,會使用 AnnotationDrivenBeanDefinitionParser 來解析 annotation-driven 標簽,如下:
public class TxNamespaceHandler extends NamespaceHandlerSupport {
//......
@Override
public void init() {
registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}
}
class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
// 重點——將TransactionalEventListenerFactory加入到容器中
registerTransactionalEventListenerFactory(parserContext);
String mode = element.getAttribute("mode");
if ("aspectj".equals(mode)) {
// mode="aspectj"
registerTransactionAspect(element, parserContext);
}
else {
// mode="proxy"
AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
}
return null;
}
private void registerTransactionalEventListenerFactory(ParserContext parserContext) {
RootBeanDefinition def = new RootBeanDefinition();
def.setBeanClass(TransactionalEventListenerFactory.class);
parserContext.registerBeanComponent(new BeanComponentDefinition(def,
TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME));
}
}
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
//省略部分代碼......
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
}
}
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
//省略部分代碼......
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 事務(wù)存在時,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的緩存集合中
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
} else if (this.annotation.fallbackExecution()) {
//.......
}
processEvent(event);
} else {
// 當前不存在事務(wù)什么也不做
}
}
上述 @TransactionalEventListener 本質(zhì)上是一個 @EventListener,TransactionalEventListenerFactory類會將每一個掃描到的方法有TransactionalEventListener注解包裝成ApplicationListenerMethodTransactionalAdapter對象,通過ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若當前存在事務(wù),就會生成TransactionSynchronization并加入到 TransactionSynchronizationManager的緩存ThreadLocal集合中,剩余流程同上述分析。(使用 @TransactionalEventListener 結(jié)合 Spring事件監(jiān)聽機制,并使用到異步方式感覺有點別扭,這里是為了說明問題)。
四、使用案例
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5,
new ThreadFactoryBuilder().setDaemon(false).setNamePrefix("execApiCache").build());
@Override
@Transactional(rollbackFor = Exception.class)
public ResultVO addApi(Api api, List<Header> headerList, List<Request> requestList, Response response, List<Script> scriptList, List<RespCodeMapping> respCodeMappingList) {
// 數(shù)據(jù)庫代碼...
// 異步代碼
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
log.warn("afterCommit...");
executorService.execute(() -> {
// 異步業(yè)務(wù)
execApiCache(api);
});
}});
return ResultUtil.buildSucc();
}
Ps:setDaemon(false) 注意這里守護線程標記必須設(shè)置為 false,否則主線程執(zhí)行完,異步線程沒執(zhí)行完的話,異步線程會馬上被中斷、關(guān)閉,所以這里不能設(shè)置成守護(用戶)線程。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
springMVC如何對輸入數(shù)據(jù)校驗實現(xiàn)代碼
數(shù)據(jù)的校驗是交互式網(wǎng)站一個不可或缺的功能,數(shù)據(jù)驗證分為客戶端驗證和服務(wù)器端驗證,這篇文章主要介紹了springMVC如何對輸入數(shù)據(jù)校驗,需要的朋友可以參考下2020-10-10
redis分布式鎖RedissonLock的實現(xiàn)細節(jié)解析
這篇文章主要介紹了redis分布式鎖RedissonLock的實現(xiàn)細節(jié)解析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06
Security框架:如何使用CorsFilter解決前端跨域請求問題
這篇文章主要介紹了Security框架:如何使用CorsFilter解決前端跨域請求問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11
Java使用itext5實現(xiàn)PDF表格文檔導(dǎo)出
這篇文章主要介紹了Java使用itext5實現(xiàn)PDF表格文檔導(dǎo)出,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01
springboot配置druid多數(shù)據(jù)源的示例代碼
這篇文章主要介紹了springboot配置druid多數(shù)據(jù)源的示例代碼,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-09-09
Spring Data JPA實現(xiàn)排序與分頁查詢超詳細流程講解
在介紹Spring Data JPA的時候,我們首先認識下Hibernate。Hibernate是數(shù)據(jù)訪問解決技術(shù)的絕對霸主,使用O/R映射技術(shù)實現(xiàn)數(shù)據(jù)訪問,O/R映射即將領(lǐng)域模型類和數(shù)據(jù)庫的表進行映射,通過程序操作對象而實現(xiàn)表數(shù)據(jù)操作的能力,讓數(shù)據(jù)訪問操作無須關(guān)注數(shù)據(jù)庫相關(guān)的技術(shù)2022-10-10
Java泛型模擬scala實現(xiàn)自定義ArrayList方式
這篇文章主要介紹了Java泛型模擬scala實現(xiàn)自定義ArrayList方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10

