Spring?AOP實(shí)現(xiàn)聲明式事務(wù)機(jī)制源碼解析
一、聲明式全局事務(wù)
在Seata示例工程中,能看到@GlobalTransactional,如下方法示例:
@GlobalTransactional
public boolean purchase(long accountId, long stockId, long quantity) {
String xid = RootContext.getXID();
LOGGER.info("New Transaction Begins: " + xid);
boolean stockResult = reduceAccount(accountId,stockId, quantity);
if (!stockResult) {
throw new RuntimeException("賬號(hào)服務(wù)調(diào)用失敗,事務(wù)回滾!");
}
Long orderId = createOrder(accountId, stockId, quantity);
if (orderId == null || orderId <= 0) {
throw new RuntimeException("訂單服務(wù)調(diào)用失敗,事務(wù)回滾!");
}
return true;
}
purchase方法上加上此注解,即表示此方法內(nèi)的reduceAccount和createOrder兩個(gè)微服務(wù)調(diào)用也將加入到分布式事務(wù)中,即扣除賬戶(hù)余額與創(chuàng)建訂單將具有分布式事務(wù)的數(shù)據(jù)一致性保障能力。
了解 Spring 注解事務(wù)實(shí)現(xiàn)的話,應(yīng)該也能推測(cè)出,Seata 的事務(wù)能力也可能是基于 Spring 的 AOP 機(jī)制,給標(biāo)注了@GlobalTransactional 的方法做 AOP 增加,織入額外的邏輯以完成分布式事務(wù)的能力,偽代碼大致如下:
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
tx.begin(xxx);
...
purchase(xxx)//給purchase增加全局事務(wù)處理能力
...
tx.commit();
} catch (Exception exx) {
tx.rollback();
throw exx;
}
本篇就介紹Seata 如何使用 Spring AOP 來(lái)將注解變成分布式事務(wù)的代碼。
二、源碼
在上一篇《Seata1.6源碼全局事務(wù)注解@GlobalTransactional的識(shí)別》bean,并對(duì)這類(lèi) bean 添加GlobalTransactionalInterceptor,進(jìn)行 AOP 增強(qiáng),加入分布式事務(wù)的能力。本篇延續(xù)這個(gè)話題繼續(xù),梳理 AOP 增強(qiáng)的邏輯。
通過(guò)下邊的調(diào)用堆棧幫大家梳理出 在源碼AbstractAutoProxyCreator#wrapIfNecessary中有 createProxy的調(diào)用。
createProxy:443, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy) wrapIfNecessary:344, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy) wrapIfNecessary:307, GlobalTransactionScanner (io.seata.spring.annotation) postProcessAfterInitialization:293, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy) applyBeanPostProcessorsAfterInitialization:455, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) initializeBean:1808, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) doCreateBean:620, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) createBean:542, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
AbstractAutoProxyCreator#createProxy其中new ProxyFactory()則是 AOP 的關(guān)鍵。
protected Object createProxy(Class<?> beanClass, @Nullable String beanName,
@Nullable Object[] specificInterceptors, TargetSource targetSource) {
if (this.beanFactory instanceof ConfigurableListableBeanFactory) {
// 為目標(biāo) Bean 的 BeanDefinition 對(duì)象設(shè)置一個(gè)屬性
// org.springframework.aop.framework.autoproxy.AutoProxyUtils.originalTargetClass -> 目標(biāo) Bean 的 Class 對(duì)象
AutoProxyUtils.exposeTargetClass((ConfigurableListableBeanFactory) this.beanFactory, beanName, beanClass);
}
// <1> 創(chuàng)建一個(gè)代理工廠
ProxyFactory proxyFactory = new ProxyFactory();
// <2> 復(fù)制當(dāng)前 ProxyConfig 的一些屬性(例如 proxyTargetClass、exposeProxy)
proxyFactory.copyFrom(this);
/**
* <3> 判斷是否類(lèi)代理,也就是是否開(kāi)啟 CGLIB 代理
* 默認(rèn)配置下為 `false`,參考 {@link org.springframework.context.annotation.EnableAspectJAutoProxy}
*/
if (!proxyFactory.isProxyTargetClass()) {
/*
* <3.1> 如果這個(gè) Bean 配置了進(jìn)行類(lèi)代理,則設(shè)置為 `proxyTargetClass` 為 `true`
*/
if (shouldProxyTargetClass(beanClass, beanName)) {
proxyFactory.setProxyTargetClass(true);
}
else {
/*
* <3.2> 檢測(cè)當(dāng)前 Bean 實(shí)現(xiàn)的接口是否包含可代理的接口
* 如沒(méi)有實(shí)現(xiàn),則將 `proxyTargetClass` 設(shè)為 `true`,表示需要進(jìn)行 CGLIB 提升
*/
evaluateProxyInterfaces(beanClass, proxyFactory);
}
}
/*
* <4> 對(duì)入?yún)⒌?Advisor 進(jìn)一步處理,因?yàn)槠渲锌赡苓€存在 Advice 類(lèi)型,需要將他們包裝成 DefaultPointcutAdvisor 對(duì)象
* 如果配置了 `interceptorNames` 攔截器,也會(huì)添加進(jìn)來(lái)
*/
Advisor[] advisors = buildAdvisors(beanName, specificInterceptors);
// <5> 代理工廠添加 Advisor 數(shù)組
proxyFactory.addAdvisors(advisors);
// <6> 代理工廠設(shè)置 TargetSource 對(duì)象
proxyFactory.setTargetSource(targetSource);
// <7> 對(duì) ProxyFactory 進(jìn)行加工處理,抽象方法,目前沒(méi)有子類(lèi)實(shí)現(xiàn)
customizeProxyFactory(proxyFactory);
proxyFactory.setFrozen(this.freezeProxy);
// <8> 是否這個(gè) AdvisedSupport 配置管理器已經(jīng)過(guò)濾過(guò)目標(biāo)類(lèi)(默認(rèn)為 false)
if (advisorsPreFiltered()) {
// 設(shè)置 `preFiltered` 為 `true`
// 這樣 Advisor 們就不會(huì)根據(jù) ClassFilter 進(jìn)行過(guò)濾了,而直接通過(guò) MethodMatcher 判斷是否處理被攔截方法
proxyFactory.setPreFiltered(true);
}
// Use original ClassLoader if bean class not locally loaded in overriding class loader
ClassLoader classLoader = getProxyClassLoader();
if (classLoader instanceof SmartClassLoader && classLoader != beanClass.getClassLoader()) {
classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();
}
// <9> 通過(guò) ProxyFactory 代理工廠創(chuàng)建代理對(duì)象
return proxyFactory.getProxy(getProxyClassLoader());
}
上邊源碼讀起來(lái)很生硬,對(duì)于我們使用來(lái)梳理核心源碼流程來(lái)說(shuō),留意 AOP 實(shí)現(xiàn)的幾個(gè)關(guān)鍵要素即可:
- 配置target,被代理者(類(lèi)或方法中有標(biāo)注@GlobalTransactional的bean),最終還是要調(diào)用他么的方法
- 配置接口,即代理要具備的功能
- 配置額外的切面 addAdvisors,這里是指定
GlobalTransactionalInterceptor - 根據(jù)ClassLoader 類(lèi)加載器創(chuàng)建代理
由此我們可以推測(cè)中分布式事務(wù)的邏輯是在 GlobalTransactionalInterceptor 中,核心邏輯的實(shí)現(xiàn)應(yīng)該就是invoke中,我們從GlobalTransactionalInterceptor#invoke源碼中理一理:
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 獲取方法上的@GlobalTransactional注解中的內(nèi)容
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
// 獲取方法上的@GlobalLock注解中的內(nèi)容
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
//判斷是否禁用或者降級(jí)狀態(tài)
boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
//構(gòu)建事務(wù)描述信息,這些基礎(chǔ)配置信息很重要
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.rollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.propagation(),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes(),
globalTransactionalAnnotation.lockStrategyMode());
} else {
transactional = this.aspectTransactional;
}
//若是@GlobalTransactional
return handleGlobalTransaction(methodInvocation, transactional);
} else if (globalLockAnnotation != null) {
//若是@GlobalLock
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}
handleGlobalTransaction中開(kāi)始了重點(diǎn),transactionalTemplate從其名字可知,這是模板方法模式,new TransactionalExecutor() 中 getTransactionInfo是在構(gòu)建事務(wù)的一些基礎(chǔ)信息,execute()中則是指定了事務(wù)目標(biāo)方法(如purchase方法),
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final AspectTransactional aspectTransactional) throws Throwable {
boolean succeed = true;
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
...
@Override
public TransactionInfo getTransactionInfo() {
// reset the value of timeout
int timeout = aspectTransactional.getTimeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
...
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
...
}
} finally {
if (ATOMIC_DEGRADE_CHECK.get()) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
execute方法中的內(nèi)容是重點(diǎn)
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation.
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
//...
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
//若全局事務(wù)上下文未就緒則new DefaultGlobalTransaction();
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
//2. 開(kāi)啟全局事務(wù),
// 2.1 triggerBeforeBegin()
// 2.2 會(huì)跟TC通信獲取全局事務(wù)ID:xid,
// 2.3 RootContext.bind(xid);
// 2.4 triggerAfterBegin()的事件通知調(diào)用
beginTransaction(txInfo, tx);
Object rs;
try {
// 執(zhí)行我們的事務(wù)方法如`purchase`方法
rs = business.execute();
} catch (Throwable ex) {
// 3. 遇到 business exception 則回滾
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. 提交事務(wù),觸發(fā)事件回調(diào)
// 4.1 triggerBeforeCommit();
// 4.2 tx.commit();與TC通信提交事務(wù),內(nèi)部默認(rèn)是有5次重試機(jī)會(huì)
// 4.3 triggerAfterCommit();
commitTransaction(tx, txInfo);
//返回結(jié)果
return rs;
} finally {
//5. clear
resumeGlobalLockConfig(previousConfig);
//結(jié)束后的回調(diào)
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
三、小結(jié):
本篇梳理了引入seata-spring-boot-starter模塊后,其內(nèi)部會(huì)通過(guò)的自動(dòng)裝配機(jī)制會(huì)在SeataAutoConfiguration類(lèi)中,掃描具有@GlobalTransactional全局事務(wù)注解的類(lèi)和方法的 bean,并通過(guò)ProxyFactory機(jī)制對(duì)這類(lèi) bean 進(jìn)行AOP代理, 添加GlobalTransactionalInterceptor,在其內(nèi)部invoke中通過(guò)transactionalTemplate加入分布式事務(wù)的能力:
- 開(kāi)啟事務(wù)與 TC 進(jìn)行通信,獲取 xid ,注入事務(wù)上下文
- 調(diào)用目標(biāo)方法
- 之后根據(jù)結(jié)果是否正常執(zhí)行二階段的提交或回滾
但這里僅僅是 TM 的能力,仍未到RM的職能邊界。
以上就是Spring AOP實(shí)現(xiàn)聲明式事務(wù)機(jī)制源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Spring AOP聲明式事務(wù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Spring事務(wù)框架之TransactionStatus源碼解析
- Spring事務(wù)控制策略及@Transactional失效問(wèn)題解決避坑
- Spring強(qiáng)大事務(wù)兼容數(shù)據(jù)庫(kù)多種組合解決業(yè)務(wù)需求
- Spring事務(wù)@Transactional注解四種不生效案例場(chǎng)景分析
- Spring學(xué)習(xí)JdbcTemplate數(shù)據(jù)庫(kù)事務(wù)參數(shù)
- Spring框架JdbcTemplate數(shù)據(jù)庫(kù)事務(wù)管理完全注解方式
- Spring事務(wù)框架之TransactionDefinition源碼解析
相關(guān)文章
SpringMVC之簡(jiǎn)單的增刪改查示例(SSM整合)
本篇文章主要介紹了SpringMVC之簡(jiǎn)單的增刪改查示例(SSM整合),這個(gè)例子是基于SpringMVC+Spring+Mybatis實(shí)現(xiàn)的。有興趣的可以了解一下。2017-03-03
兩個(gè)List集合取相同重復(fù)數(shù)據(jù)的方法
今天小編就為大家分享一篇關(guān)于兩個(gè)List集合取相同重復(fù)數(shù)據(jù)的方法,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-12-12
java實(shí)現(xiàn)解析Cron時(shí)間表達(dá)式為中文描述
這篇文章主要為大家詳細(xì)介紹了java如何實(shí)現(xiàn)解析Cron時(shí)間表達(dá)式為中文描述,文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價(jià)值,感興趣的小伙伴可以了解下2023-11-11
如何用120行Java代碼寫(xiě)一個(gè)自己的區(qū)塊鏈
這篇文章就是幫助你使用 Java 語(yǔ)言來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的區(qū)塊鏈,用不到 120 行代碼來(lái)揭示區(qū)塊鏈的原理,感興趣的就一起來(lái)了解一下2019-06-06
解決Spring boot 嵌入的tomcat不啟動(dòng)問(wèn)題
這篇文章主要介紹了解決Spring boot 嵌入的tomcat不啟動(dòng)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-10-10
詳解Java項(xiàng)目中讀取properties文件
本篇文章主要介紹了Java項(xiàng)目中讀取properties文件,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2016-12-12

