JPA多數(shù)據(jù)源分布式事務處理方案
前言
多數(shù)據(jù)源的事務處理是個老生常談的話題,跨兩個數(shù)據(jù)源的事務管理也算是分布式事務的范疇,在同一個JVM里處理多數(shù)據(jù)源的事務,比較經(jīng)典的處理方案是JTA(基于XA協(xié)議建模的java標準事務抽象)+XA(XA事務協(xié)議),常見的JTA實現(xiàn)框架有Atomikos、Bitronix、Narayana,Spring對這些框架都有組件封裝,基本可以做到開箱即用程度。本文除了分享XA事務方案外,提供了一種新的多數(shù)據(jù)源事務解決思路和視角。
問題背景
在解決mysql字段脫敏處理時,結合sharding-jdbc的脫敏組件功能,為了sql兼容和最小化應用改造,博主給出了一個多數(shù)據(jù)源融合的字段脫敏解決方案(只把包含脫敏字段表的操作走sharding-jdbc脫敏代理數(shù)據(jù)源)。這個方案解決了問題的同時,帶來了一個新的問題,數(shù)據(jù)源的事務是獨立的,正如我文中所述《JPA項目多數(shù)據(jù)源模式整合sharding-jdbc實現(xiàn)數(shù)據(jù)脫敏》,在spring上下文中,每個數(shù)據(jù)源對應一個獨立的事務管理器,默認的事務管理器的數(shù)據(jù)源就用業(yè)務本身的數(shù)據(jù)源,所以需要加密的業(yè)務使用時,需要指定@Transactional注解里的事務管理器名稱為脫敏對應的事務管理器名稱。簡單的業(yè)務場景這樣用也就沒有問題了,但是一般的業(yè)務場景總有一個事務覆蓋兩個數(shù)據(jù)源的操作,這個時候單指定哪個事務管理器都不行,so,這里需要一種多數(shù)據(jù)源的事務管理器。
XA事務方案
XA協(xié)議采用2PC(兩階段提交)的方式來管理分布式事務。XA接口提供資源管理器與事務管理器之間進行通信的標準接口。在JDBC的XA事務相關api抽象里,相關接口定義如下
XADataSource,XA協(xié)議數(shù)據(jù)源
public interface XADataSource extends CommonDataSource {
/**
* 嘗試建立物理數(shù)據(jù)庫連接,使用給定的用戶名和密碼。返回的連接可以在分布式事務中使用
*/
XAConnection getXAConnection() throws SQLException;
//省略getLogWriter等非關鍵方法
}XAConnection
public interface XAConnection extends PooledConnection {
/**
* 檢索一個{@code XAResource}對象,事務管理器將使用該對象管理該{@code XAConnection}對象在分布式事務中的事務行為
*/
javax.transaction.xa.XAResource getXAResource() throws SQLException;
}XAResource
public interface XAResource {
/**
* 提交xid指定的全局事務
*/
void commit(Xid xid, boolean onePhase) throws XAException;
/**
* 結束代表事務分支執(zhí)行的工作。資源管理器從指定的事務分支中分離XA資源,并讓事務完成。
*/
void end(Xid xid, int flags) throws XAException;
/**
* 通知事務管理器忽略此xid事務分支
*/
void forget(Xid xid) throws XAException;
/**
* 判斷是否同一個資源管理器
*/
boolean isSameRM(XAResource xares) throws XAException;
/**
* 指定xid事務準備階段
*/
int prepare(Xid xid) throws XAException;
/**
* 從資源管理器獲取準備好的事務分支的列表。事務管理器在恢復期間調用此方法,
* 以獲取當前處于準備狀態(tài)或初步完成狀態(tài)的事務分支的列表。
*/
Xid[] recover(int flag) throws XAException;
/**
* 通知資源管理器回滾代表事務分支完成的工作。
*/
void rollback(Xid xid) throws XAException;
/**
* 代表xid中指定的事務分支開始工作。
*/
void start(Xid xid, int flags) throws XAException;
//省略非關鍵方法
}相比較普通的事務管理,JDBC的XA協(xié)議管理多了一個XAResource資源管理器,XA事務相關的行為(開啟、準備、提交、回滾、結束)都由這個資源管理器來控制,這些都是框架內部的行為,體現(xiàn)在開發(fā)層面提供的數(shù)據(jù)源也變成了XADataSource。而JTA的抽象里,定義了UserTransaction、TransactionManager。想要使用JTA事務,必須先實現(xiàn)這兩個接口。所以,如果我們要使用JTA+XA控制多數(shù)據(jù)源的事務,在sprign boot里以Atomikos為例,
引入Atomikos依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>spring boot已經(jīng)幫我們把XA事務管理器自動裝載類定義好了,如:
創(chuàng)建JTA事務管理器
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties({ AtomikosProperties.class, JtaProperties.class })
@ConditionalOnClass({ JtaTransactionManager.class, UserTransactionManager.class })
@ConditionalOnMissingBean(PlatformTransactionManager.class)
class AtomikosJtaConfiguration {
@Bean(initMethod = "init", destroyMethod = "shutdownWait")
@ConditionalOnMissingBean(UserTransactionService.class)
UserTransactionServiceImp userTransactionService(AtomikosProperties atomikosProperties,
JtaProperties jtaProperties) {
Properties properties = new Properties();
if (StringUtils.hasText(jtaProperties.getTransactionManagerId())) {
properties.setProperty("com.atomikos.icatch.tm_unique_name", jtaProperties.getTransactionManagerId());
}
properties.setProperty("com.atomikos.icatch.log_base_dir", getLogBaseDir(jtaProperties));
properties.putAll(atomikosProperties.asProperties());
return new UserTransactionServiceImp(properties);
}
@Bean(initMethod = "init", destroyMethod = "close")
@ConditionalOnMissingBean(TransactionManager.class)
UserTransactionManager atomikosTransactionManager(UserTransactionService userTransactionService) throws Exception {
UserTransactionManager manager = new UserTransactionManager();
manager.setStartupTransactionService(false);
manager.setForceShutdown(true);
return manager;
}
@Bean
@ConditionalOnMissingBean(XADataSourceWrapper.class)
AtomikosXADataSourceWrapper xaDataSourceWrapper() {
return new AtomikosXADataSourceWrapper();
}
@Bean
JtaTransactionManager transactionManager(UserTransaction userTransaction, TransactionManager transactionManager,
ObjectProvidertransactionManagerCustomizers) {
JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, transactionManager);
transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(jtaTransactionManager));
return jtaTransactionManager;
}
}顯然,想要使用XA事務,除了需要提供UserTransaction、TransactionManager的實現(xiàn)。還必須要有一個XADataSource,而sharding-jdbc代理的數(shù)據(jù)源是DataSource的,我們需要將XADataSource包裝成普通的DataSource,spring已經(jīng)提供了一個AtomikosXADataSourceWrapper的XA數(shù)據(jù)源包裝器,而且在AtomikosJtaConfiguration里已經(jīng)注冊到Spring上下文中,所以我們在自定義數(shù)據(jù)源時可以直接注入包裝器實例,然后,因為是JPA環(huán)境,所以在創(chuàng)建EntityManagerFactory實例時,需要指定JPA的事務管理類型為JTA,綜上,普通的業(yè)務默認數(shù)據(jù)源配置如下:
/**
* @author: kl @kailing.pub
* @date: 2020/5/18
*/
@Configuration
@EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class})
public class DataSourceConfiguration{
@Primary
@Bean
public DataSource dataSource(AtomikosXADataSourceWrapper wrapper, DataSourceProperties dataSourceProperties) throws Exception {
MysqlXADataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(MysqlXADataSource.class).build();
return wrapper.wrapDataSource(dataSource);
}
@Primary
@Bean(initMethod = "afterPropertiesSet")
public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) {
return factoryBuilder.dataSource(dataSource)
.packages(Constants.BASE_PACKAGES)
.properties(jpaProperties.getProperties())
.persistenceUnit("default")
.jta(true)
.build();
}
@Bean
@Primary
public EntityManager entityManager(EntityManagerFactory entityManagerFactory){
//必須使用SharedEntityManagerCreator創(chuàng)建SharedEntityManager實例,否則SimpleJpaRepository中的事務不生效
return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
}
}sharding-jdbc加密數(shù)據(jù)源和普通業(yè)務數(shù)據(jù)源其實是同一個數(shù)據(jù)源,只是走加解密邏輯的數(shù)據(jù)源需要被sharding-jdbc的加密組件代理一層,加上了加解密的處理邏輯。所以配置如下:
/**
* @author: kl @kailing.pub
* @date: 2020/5/18
*/
@Configuration
@EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class})
public class EncryptDataSourceConfiguration {
@Bean
public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException {
return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());
}
@Bean(initMethod = "afterPropertiesSet")
public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException {
return factoryBuilder.dataSource(dataSource)
.packages(Constants.BASE_PACKAGES)
.properties(jpaProperties.getProperties())
.persistenceUnit("encryptPersistenceUnit")
.jta(true)
.build();
}
@Bean
public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){
//必須使用SharedEntityManagerCreator創(chuàng)建SharedEntityManager實例,否則SimpleJpaRepository中的事務不生效
return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
}
}遇到問題1、
Connection pool exhausted - try increasing 'maxPoolSize' and/or 'borrowConnectionTimeout' on the DataSourceBean.
解決問題:默認AtomikosXADataSourceWrapper包裝器初始化的數(shù)據(jù)源連接池最大為1,所以需要添加配置參數(shù)如:
spring.jta.atomikos.datasource.max-pool-size=20
遇到問題2、
XAER_INVAL: Invalid arguments (or unsupported command)
解決問題:這個是mysql實現(xiàn)XA的bug,僅當您在同一事務中多次訪問同一MySQL數(shù)據(jù)庫時,才會發(fā)生此問題,在mysql連接url加上如下參數(shù)即可,如:
spring.datasource.url = jdbc:mysql://127.0.0.1:3306/xxx?pinGlobalTxToPhysicalConnection=true
Mysql XA事務行為
在這個場景中,雖然是多數(shù)據(jù)源,但是底層鏈接的是同一個mysql數(shù)據(jù)庫,所以XA事務行為為,從第一個執(zhí)行的sql開始(并不是JTA事務begin階段),生成xid并XA START事務,然后XA END。第二個數(shù)據(jù)源的sql執(zhí)行時會判斷是否同一個mysql資源,如果是同一個則用剛生成的xid重新XA START RESUME,然后XA END,最終雖然在應用層是兩個DataSource,其實最后只會調用XA COMMIT一次。mysql驅動實現(xiàn)的XAResource的start如下:
public void start(Xid xid, int flags) throws XAException {
StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
commandBuf.append("XA START ");
appendXid(commandBuf, xid);
switch (flags) {
case TMJOIN:
commandBuf.append(" JOIN");
break;
case TMRESUME:
commandBuf.append(" RESUME");
break;
case TMNOFLAGS:
// no-op
break;
default:
throw new XAException(XAException.XAER_INVAL);
}
dispatchCommand(commandBuf.toString());
this.underlyingConnection.setInGlobalTx(true);
}第一次sql執(zhí)行時,flags=0,走的TMNOFLAGS邏輯,第二次sql執(zhí)行時,flags=134217728,走的TMRESUME,重新開啟事務的邏輯。以上是Mysql XA的真實事務邏輯,但是博主研究下來發(fā)現(xiàn),msyql xa并不支持XA START RESUME這種語句,而且有很多限制《Mysql XA交易限制》,所以在mysql數(shù)據(jù)庫使用XA事務時,最好了解下mysql xa的缺陷
鏈式事務方案
鏈式事務不是我首創(chuàng)的叫法,在spring-data-common項目的Transaction包下,已經(jīng)有一個默認實現(xiàn)ChainedTransactionManager,前文中《深入理解spring的@Transactional工作原理》已經(jīng)分析了Spring的事務抽象,由PlatformTransactionManager(事務管理器)、TransactionStatus(事務狀態(tài))、TransactionDefinition(事務定義)等形態(tài)組成,ChainedTransactionManager也是實現(xiàn)了PlatformTransactionManager和TransactionStatus。實現(xiàn)原理也很簡單,在ChainedTransactionManager內部維護了事務管理器的集合,通過代理編排真實的事務管理器,在事務開啟、提交、回滾時,都分別操作集合里的事務。以達到對多個事務的統(tǒng)一管理。這個方案比較簡陋,而且有缺陷,在提交階段,如果異常不是發(fā)生在第一個數(shù)據(jù)源,那么會存在之前的提交不會回滾,所以在使用ChainedTransactionManager時,盡量把出問題可能性比較大的事務管理器放鏈的后面(開啟事務、提交事務順序相反)。這里只是拋出了一種新的多數(shù)據(jù)源事務管理的思路,能用XA盡量用XA管理。
普通的業(yè)務默認數(shù)據(jù)源配置如下:
/**
* @author: kl @kailing.pub
* @date: 2020/5/18
*/
@Configuration
@EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class})
public class DataSourceConfiguration{
@Primary
@Bean
public DataSource dataSource(DataSourceProperties dataSourceProperties){
return dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
}
@Primary
@Bean(initMethod = "afterPropertiesSet")
public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) {
return factoryBuilder.dataSource(dataSource)
.packages(Constants.BASE_PACKAGES)
.properties(jpaProperties.getProperties())
.persistenceUnit("default")
.build();
}
@Bean
@Primary
public EntityManager entityManager(EntityManagerFactory entityManagerFactory){
//必須使用SharedEntityManagerCreator創(chuàng)建SharedEntityManager實例,否則SimpleJpaRepository中的事務不生效
return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
}
@Primary
@Bean
public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory){
JpaTransactionManager txManager = new JpaTransactionManager();
txManager.setEntityManagerFactory(entityManagerFactory);
return txManager;
}
}sharding-jdbc加密數(shù)據(jù)源配置如下:
/**
* @author: kl @kailing.pub
* @date: 2020/5/18
*/
@Configuration
@EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class})
public class EncryptDataSourceConfiguration {
@Bean
public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException {
return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());
}
@Bean(initMethod = "afterPropertiesSet")
public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException {
return factoryBuilder.dataSource(dataSource)
.packages(Constants.BASE_PACKAGES)
.properties(jpaProperties.getProperties())
.persistenceUnit("encryptPersistenceUnit")
.build();
}
@Bean
public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){
//必須使用SharedEntityManagerCreator創(chuàng)建SharedEntityManager實例,否則SimpleJpaRepository中的事務不生效
return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
}
@Bean
public PlatformTransactionManager chainedTransactionManager(PlatformTransactionManager transactionManager) throws SQLException {
JpaTransactionManager encryptTransactionManager = new JpaTransactionManager();
encryptTransactionManager.setEntityManagerFactory(encryptEntityManagerFactory());
//使用鏈式事務管理器包裝真正的transactionManager、txManager事務
ChainedTransactionManager chainedTransactionManager = new ChainedTransactionManager(encryptTransactionManager,transactionManager);
return chainedTransactionManager;
}
}使用這種方案,在涉及到多數(shù)據(jù)源的業(yè)務時,需要指定使用哪個事務管理器,如:
@PersistenceContext(unitName = "encryptPersistenceUnit")
private EntityManager entityManager;
@PersistenceContext
private EntityManager manager;
@Transactional(transactionManager = "chainedTransactionManager")
public AccountModel save(AccountDTO dto){
AccountModel accountModel = AccountMapper.INSTANCE.dtoTo(dto);
entityManager.persist(accountModel);
entityManager.flush();
AccountModel accountMode2 = AccountMapper.INSTANCE.dtoTo(dto);
manager.persist(accountMode2);
manager.flush();
return accountModel;
}結語
綜上,對于JPA的多數(shù)據(jù)源分布式事務處理,JTA的事務管理器經(jīng)過spring boot的封裝已經(jīng)可以開箱即用了。重點在JPA環(huán)境下,需要指定EntityManagerFactory的事務使用JTA事務。另本文分享了一種鏈式事務編排的方式也可以應用在這種場景,但是特殊的場景下不能保證事務的完整性,所以博主推薦使用JtaTransactionManager,有符合的場景也可以試試ChainedTransactionManager。
以上就是JPA多數(shù)據(jù)源分布式事務處理方案的詳細內容,更多關于JPA多數(shù)據(jù)源分布式事務處理的資料請關注腳本之家其它相關文章!
相關文章
使用JAVA+Maven+TestNG框架實現(xiàn)超詳細Appium測試安卓真機教程
這篇文章主要介紹了使用JAVA+Maven+TestNG框架實現(xiàn)超詳細Appium測試安卓真機教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01
springboot配置數(shù)據(jù)庫密碼特殊字符報錯的解決
這篇文章主要介紹了springboot配置數(shù)據(jù)庫密碼特殊字符報錯的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02
SpringMVC 中HttpMessageConverter簡介和Http請求415 的問題
本文介紹且記錄如何解決在SpringMVC 中遇到415 Unsupported Media Type 的問題,并且順便介紹Spring MVC的HTTP請求信息轉換器HttpMessageConverter2016-07-07
解決maven打包排除類不生效maven-compiler-plugin問題
總結:在Spring Boot項目B中作為項目A的依賴時,排除啟動類不生效的原因是被其他類引用或父POM引入,解決方法是跳過test編譯或注釋掉@SpringBootTest(classes={BApplication.class})2024-11-11

