SpringBoot中手動開啟數據庫事務的四種方式
概要
某些情況下我們可能需要手動開啟事務,比如由多個業(yè)務組合的功能,其中某一段業(yè)務報錯我們需要進行回滾操作,或者是使用數據庫事務實現分布式鎖。那么該如何開啟事務呢。
開啟事務
方式一:使用@Transactional注解,Spring會自動幫我們管理事務,包括開啟事務、提交事務、回滾事務。
方式二:從數據源DataSource中獲取一個Connection,DataSource是自動裝配的,SpringBoot默認使用的是HikariDataSource。將Connection自動提交設置為false,用此Connection執(zhí)行業(yè)務SQL,然后提交事務、回滾事務。
方式三:借助Spring中的事務管理器PlatformTransactionManager來開啟事務。
方式四:使用TransactionTemplate(自動注入即可),調用其execute方法來執(zhí)行業(yè)務邏輯。
PlatformTransactionManager
方式一是自動管理事務。方式二雖然能手動管理事務,但實際操作起來不太優(yōu)雅。方式四本質上還是方式三只不過把開啟事務、提交事務、回滾事務做了封裝,通過lambda函數回調執(zhí)行我們的業(yè)務,可以認為還是自動管理了事務,這里重點介紹方式三。
先看一段代碼和運行效果
@Resource
private UserMapper userMapper;
@Resource
private PlatformTransactionManager transactionManager;
@GetMapping(value = "/transaction", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> transaction() throws InterruptedException {
TransactionStatus transaction = null;
try {
//開啟事務
transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());
User user = userMapper.findById(1L);
System.out.println("更新前:" + user);
user.setAge(28);
userMapper.updateById(user);
//事物未提交前其他線程讀取數據
Thread otherThread = new Thread(() -> {
User newUser = userMapper.findById(1L);
System.out.println("新線程獲取更新后的值:" + newUser);
});
otherThread.start();
otherThread.join();
User newUser = userMapper.findById(1L);
System.out.println("更新后:" + newUser);
} finally {
if (transaction != null) {
//提交事務
transactionManager.commit(transaction);
//其他線程讀取事務提交后的值
Thread otherThread = new Thread(() -> {
User user = userMapper.findById(1L);
System.out.println("新線程獲取事務提交后的值:" + user);
});
otherThread.start();
otherThread.join();
}
}
return ResponseEntity.ok("transaction");
}
這里使用mybatis來作為持久層框架,PlatformTransactionManager系統(tǒng)已經自動裝配,這里直接注入就可以使用。從運行效果來看手動開啟的事務是生效的
上面的測試代碼是開啟了一個新線程來觀察事務開啟后的效果,由于是新線程必然和當前線程是不會共享事務。但是這種寫法需要額外的線程來操作,下面是用mybatis的SqlSessionFactory來開啟一個新的SqlSession和當前線程不共享事務。
@Resource
private UserMapper userMapper;
@Resource
private PlatformTransactionManager transactionManager;
@Resource
private SqlSessionFactory sqlSessionFactory;
@GetMapping(value = "/transaction2", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> transaction2() {
TransactionStatus transaction = null;
DefaultSqlSession sqlSession = null;
try {
//開啟事務
transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());
User user = userMapper.findById(1L);
System.out.println("更新前:" + user);
user.setAge(28);
userMapper.updateById(user);
//重新開啟一個連接
Configuration configuration = sqlSessionFactory.getConfiguration();
sqlSession = new DefaultSqlSession(
configuration,
configuration.newExecutor(
new JdbcTransaction(configuration.getEnvironment().getDataSource().getConnection()),
ExecutorType.SIMPLE),
true);
User user2 = sqlSession.getMapper(UserMapper.class).findById(1L);
System.out.println("新SqlSession獲取更新后的值:" + user2);
User newUser = userMapper.findById(1L);
System.out.println("更新后:" + newUser);
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
if (transaction != null) {
//提交事務
transactionManager.commit(transaction);
if (sqlSession != null) {
User newUser = sqlSession.getMapper(UserMapper.class).findById(1L);
System.out.println("新SqlSession獲取事務提交后的值:" + newUser);
sqlSession.close();
}
}
}
return ResponseEntity.ok("transaction2");
}
從運行結果來看,事務提交后新sqlSession獲取的age應該為28,但仍然是18。這是因為同一個sqlSession執(zhí)行了相同的查詢sql語句時,后續(xù)的查詢會從緩存中拿值,我們需要在相應的mapper方法上加上@Options注解每次查詢前會清空緩然后走數據庫查詢。
@Options(flushCache = Options.FlushCachePolicy.TRUE)
@Select("select * from user where id = #{id}")
User findById(Long id);
技術細節(jié)
PlatformTransactionManager是如何實現手動管理事務的

PlatformTransactionManager的實現是JdbcTransactionManager,參考DataSourceTransactionManagerAutoConfiguration自動裝配類。如果引入了其他事務框架,如spring-boot-starter-data-jpa,那么PlatformTransactionManager實現會是JpaTransactionManager,可以參考HibernateJpaAutoConfiguration自動裝配類。不管是JdbcTransactionManager還是JpaTransactionManager在開啟事務時做的相關操作都是類似的,都是從數據源中獲取到一個新的Connection后將其自動提交設置為false。

當我們在代碼中執(zhí)行
transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());
getTransaction方法在其抽象類AbstractPlatformTransactionManager中,源碼如下
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// 省略相關代碼。。。
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
//開啟新的事務
return startTransaction(def, transaction, false, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
// 省略相關代碼。。。
}
}
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean nested, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, nested, debugEnabled, suspendedResources);
this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
try {
//此處由實現類實現
doBegin(transaction, definition);
}
catch (RuntimeException | Error ex) {
this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));
throw ex;
}
prepareSynchronization(status, definition);
this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null));
return status;
}doBegin是抽象方法,其實現在JdbcTransactionManager的父類DataSourceTransactionManager中實現。
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//從數據源中獲取一個新的連接
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
if (definition.isReadOnly()) {
checkDefaultReadOnly(newCon);
}
// 把新的數據庫連接綁定到ConnectionHolder中
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 省略部分代碼。。。
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
// 正常情況下新獲取的連接都是自動提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
//將數據庫連接改為手動提交
con.setAutoCommit(false);
}
// 省略部分代碼。。。
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
//給當前線程的數據源綁定一個ConnectionHolder
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
// 省略部分代碼。。。
}
}源碼中可以看到新獲取的連接其自動提交被設置為false這樣就能實現手動提交事務了。且新的連接被TransactionSynchronizationManager(事務同步器)綁定到當前線程中,事務同步器在綁定數據時是用ThreadLocal來實現的,方便后續(xù)線程能直接拿到綁定的數據庫連接。
當使用mybatis的mapper接口或者sqlSession查詢以及更新數據時,是如何共享事務的。
mapper接口會變成一個代理對象(是一個MapperFactoryBean屬于工廠Bean),sql的執(zhí)行是交給代理對象中封裝的sqlSession來完成操作。sqlSession在執(zhí)行sql語句時最終會交給Executor。

Executor中會有個事務字段transaction是一個接口。在Spring環(huán)境下它的實現是SpringManagedTransaction。Executor執(zhí)行sql語句時會從transaction中獲取一個數據連接。

public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
LOGGER.debug(() -> "JDBC Connection [" + this.connection + "] will"
+ (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring");
}可以看到連接的獲取是通過工具類DataSourceUtils來操作完成的,這個是spring jdbc中所提供的工具類。
public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
try {
return doGetConnection(dataSource);
}
catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex);
}
catch (IllegalStateException ex) {
throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex);
}
}
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
//這里能看到連接是從事務同步器中拿的
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}
// Else we either got no holder or an empty thread-bound holder here.
// 省略相關代碼。。。
return con;
}由于前面開啟事務時已經給當前線程綁定了一個ConnectionHolder,這里就直接接能獲取到,這樣就實現了同一個線程中數據庫連接的共享。最后提交事務時是交給doCommit方法完成的。
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
// 最開始創(chuàng)建一個新的事務時txObject中已經綁定了ConnectionHolder
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw translateException("JDBC commit", ex);
}
}
事務提交之后從數據源中拿到的Connection自動提交要恢復為true。JdbcTransactionManager的操作是在父類DataSourceTransactionManager的doCleanupAfterCompletion方法中完成的。
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// Remove the connection holder from the thread, if exposed.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
// Reset connection.
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
// 恢復為自動提交
con.setAutoCommit(true);
}
DataSourceUtils.resetConnectionAfterTransaction(con,
txObject.getPreviousIsolationLevel(),
(txObject.isReadOnly() && !isDefaultReadOnly()));
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
// 省略部分代碼。。。
}總結
為何mybatis的sqlSession在執(zhí)行同一個查詢sql語句時后續(xù)會從緩存中拿值。前面說到sql語句的執(zhí)行會交給Executor,其查詢方法如下。
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler,
CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
// 如果設置了強制刷新緩存,每次執(zhí)行查詢時都會清空一遍緩存
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
// 如果從緩存中拿到了值就不從數據庫中查詢了
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
return list;
}mybatis在掃描mapper接口時會默認讓查詢語句的刷新緩存為都為false,這里其實就是mybatis的一級緩存,屬于會話級別。當指定Options注解且其flushCache為true時會設置查詢語句要刷新緩存,如果是使用xml寫sql語句,相應的select標簽上指定flushCache屬性為true。
到此這篇關于SpringBoot中如何手動開啟數據庫事務的文章就介紹到這了,更多相關SpringBoot手動開啟數據庫事務內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SSM框架整合之Spring+SpringMVC+MyBatis實踐步驟
大家都知道Spring是一個輕量級的控制反轉(IoC)和面向切面(AOP)的容器框架,本文主要介紹三大框架的整合包含spring和mybatis的配置文件,還有spring-mvc的配置文件的詳細介紹,通過項目實踐步驟給大家詳細介紹,感興趣的朋友一起看看吧2021-06-06
RedisTemplate.opsForHash()用法簡介并舉例說明
redistemplate.opsforhash是RedisTemplate模板類中的一個方法,用于獲取操作哈希數據類型的接口,這篇文章主要給大家介紹了關于RedisTemplate.opsForHash()用法簡介并舉例說明的相關資料,需要的朋友可以參考下2024-06-06
MyBatisPlus中@TableField注解的基本使用
這篇文章主要介紹了MyBatisPlus中@TableField注解的基本使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07

