Seata AT模式如何實(shí)現(xiàn)行鎖詳解
前言
我們在很多博客中都有發(fā)現(xiàn),Seata AT模式里面的全局鎖其實(shí)是行鎖,這也是Seata AT模式和XA模式在鎖粒度上的最大區(qū)別。我們可以在官網(wǎng)看到這樣一個例子:
兩個全局事務(wù) tx1 和 tx2,分別對 a 表的 m 字段進(jìn)行更新操作,m 的初始值 1000。
tx1 先開始,開啟本地事務(wù),拿到本地鎖,更新操作 m = 1000 - 100 = 900。本地事務(wù)提交前,先拿到該記錄的 全局鎖 ,本地提交釋放本地鎖。 tx2 后開始,開啟本地事務(wù),拿到本地鎖,更新操作 m = 900 - 100 = 800。本地事務(wù)提交前,嘗試拿該記錄的 全局鎖 ,tx1 全局提交前,該記錄的全局鎖被 tx1 持有,tx2 需要重試等待 全局鎖 。

tx1 二階段全局提交,釋放 全局鎖 。tx2 拿到 全局鎖 提交本地事務(wù)。

如果 tx1 的二階段全局回滾,則 tx1 需要重新獲取該數(shù)據(jù)的本地鎖,進(jìn)行反向補(bǔ)償?shù)母虏僮?,?shí)現(xiàn)分支的回滾。
此時,如果 tx2 仍在等待該數(shù)據(jù)的 全局鎖,同時持有本地鎖,則 tx1 的分支回滾會失敗。分支的回滾會一直重試,直到 tx2 的 全局鎖 等鎖超時,放棄 全局鎖 并回滾本地事務(wù)釋放本地鎖,tx1 的分支回滾最終成功。
因?yàn)檎麄€過程 全局鎖 在 tx1 結(jié)束前一直是被 tx1 持有的,所以不會發(fā)生 臟寫 的問題。
那么你知道Seata AT模式是如何實(shí)現(xiàn)行鎖的嘛?為了搞明白AT模式到底是怎么獲取全局鎖的,我們深入源碼來看看。
如何加鎖
為了證實(shí)全局鎖就是我們所說的行鎖,經(jīng)過一番尋找,我在BaseTransactionalExecutor類中的prepareUndoLog()方法中找到了這樣一段代碼:
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
if (null != lockKeys) {
connectionProxy.appendLockKey(lockKeys);
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
connectionProxy.appendUndoLog(sqlUndoLog);
}
- 如果是刪除的SQL,那么通過
beforeImage生成行鎖標(biāo)記,否則通過afterImage生成行鎖標(biāo)記;
比如表名wallet_tbl,里面有一個主鍵id值為1,那么最終生成的lockKeys為wallet_tbl:1,如果有多行記錄id值分別為1、2、3,那么最終生成的lockKeys為wallet_tbl:1,2,3;多個主鍵索引的話使用_連接。所以我們可以總結(jié)出lockKeys的生成規(guī)則為:tableName:1_A,2_B,3_C,1、2、3、A、B、C分別為主鍵索引的值。
此時還沒有真正地拿到鎖,只是生成一個鎖的標(biāo)記。真正地上鎖需要查看ConnectionProxy.register()方法:
private void register() throws TransactionException {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
context.setBranchId(branchId);
}
branchRegister()方法就是RM向TC進(jìn)行分支注冊,同時會申請行鎖。那么獲取行鎖的核心代碼應(yīng)該就是在TC端了,我們順著branchRegister()邏輯一路找到BranchSession.lock():
public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {
if (this.getBranchType().equals(BranchType.AT)) {
// 只有AT模式需要獲取行鎖
return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);
}
return true;
}
下面就要真正地開始進(jìn)入LockerManager來申請鎖了:
@Override
public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {
if (branchSession == null) {
throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
}
String lockKey = branchSession.getLockKey();
if (StringUtils.isNullOrEmpty(lockKey)) {
// no lock
return true;
}
// get locks of branch
// 將lockKey解析成多行RowLock
List<RowLock> locks = collectRowLocks(branchSession);
if (CollectionUtils.isEmpty(locks)) {
// no lock
return true;
}
return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);
}
這里做了一步將lockKey解析成多行RowLock,根據(jù)上面的tableName:1_A,2_B,3_C規(guī)則,最終解析成3個RowLock對象:{tableName,1_A},{tableName,2_B},{tableName,3_C}
最終我們追蹤到最后一個關(guān)鍵方法LockStoreDataBaseDAO.acquireLock():
@Override
public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
Set<String> dbExistedRowKeys = new HashSet<>();
boolean originalAutoCommit = true;
// 如果有多行鎖,那么先去重
if (lockDOs.size() > 1) {
lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
}
try {
conn = lockStoreDataSource.getConnection();
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
List<LockDO> unrepeatedLockDOs = lockDOs;
?
//check lock
if (!skipCheckLock) {
?
boolean canLock = true;
// 查詢是否已經(jīng)存在行鎖
// "select row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified from lock_table where row_key in (?, ?, ?, ?) order by status desc"
// in里面最多限制1000個
String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
ps = conn.prepareStatement(checkLockSQL);
for (int i = 0; i < lockDOs.size(); i++) {
ps.setString(i + 1, lockDOs.get(i).getRowKey());
}
rs = ps.executeQuery();
String currentXID = lockDOs.get(0).getXid();
boolean failFast = false;
while (rs.next()) {
String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
// 如果發(fā)現(xiàn)有其他分布式事務(wù)和當(dāng)前申請行鎖的數(shù)據(jù)一致,那么加鎖失敗
if (!StringUtils.equals(dbXID, currentXID)) {
if (LOGGER.isInfoEnabled()) {
String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
}
if (!autoCommit) {
int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
if (status == LockStatus.Rollbacking.getCode()) {
failFast = true;
}
}
// 加鎖失敗
canLock = false;
break;
}
?
dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
}
// 加鎖失敗,回滾拋異常
if (!canLock) {
conn.rollback();
if (failFast) {
throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
}
return false;
}
// 如果是同一個分布式事務(wù)中申請行鎖,那么剔除重復(fù)的鎖數(shù)據(jù)
if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
.collect(Collectors.toList());
}
// 如果剔除后不需要再補(bǔ)充行鎖,那么直接返回申請成功
if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
conn.rollback();
return true;
}
}
?
// 申請行鎖,分1行和多行兩種情況
if (unrepeatedLockDOs.size() == 1) {
LockDO lockDO = unrepeatedLockDOs.get(0);
if (!doAcquireLock(conn, lockDO)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
}
conn.rollback();
return false;
}
} else {
if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
}
conn.rollback();
return false;
}
}
conn.commit();
return true;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(rs, ps);
if (conn != null) {
try {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
conn.close();
} catch (SQLException e) {
}
}
}
}
1.先通過查詢語句檢查是否存在鎖沖突,鎖沖突的話,就直接失敗拋異常;
2.不存在鎖沖突,檢查是否鎖重入,重入的話,補(bǔ)充行鎖;
3.添加行鎖;
檢查鎖沖突的SQL語句如下:
select row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified from lock_table where row_key in (?, ?, ?, ?) order by status desc
添加行鎖SQL語句如下:
insert into lock_table (row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, now(), now(), ?)
為什么是行鎖
根據(jù)上面加鎖的邏輯,我們發(fā)現(xiàn)一直比較的都是row_key這個主鍵,那么為什么row_key代表的是行鎖呢?這個問題就要回到row_key是如何產(chǎn)生的:
protected LockDO convertToLockDO(RowLock rowLock) {
LockDO lockDO = new LockDO();
lockDO.setBranchId(rowLock.getBranchId());
lockDO.setPk(rowLock.getPk());
lockDO.setResourceId(rowLock.getResourceId());
// row_key的生成
lockDO.setRowKey(getRowKey(rowLock.getResourceId(), rowLock.getTableName(), rowLock.getPk()));
lockDO.setXid(rowLock.getXid());
lockDO.setTransactionId(rowLock.getTransactionId());
lockDO.setTableName(rowLock.getTableName());
return lockDO;
}
根據(jù)上面代碼,我們很清楚地了解到,row_key是由resource_id、tableName、pk這三個字段連接生成的,也就意味著row_key是代表表里面的具體一行數(shù)據(jù),也就是我們的行記錄,所以我們確信AT模式的全局鎖其實(shí)就是行鎖。
以上就是Seata AT模式如何實(shí)現(xiàn)行鎖詳解的詳細(xì)內(nèi)容,更多關(guān)于Seata AT模式實(shí)現(xiàn)行鎖的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java封裝實(shí)現(xiàn)自適應(yīng)的單位轉(zhuǎn)換工具類
這篇文章主要為大家詳細(xì)介紹了如何使用Java封裝實(shí)現(xiàn)一個自適應(yīng)的單位轉(zhuǎn)換工具類,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-03-03
SpringCloud 服務(wù)負(fù)載均衡和調(diào)用 Ribbon、OpenFeign的方法
這篇文章主要介紹了SpringCloud 服務(wù)負(fù)載均衡和調(diào)用 Ribbon、OpenFeign的方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09
Java詳細(xì)分析Lambda表達(dá)式與Stream流的使用方法
Lambda表達(dá)式,基于Lambda所帶來的函數(shù)式編程,又引入了一個全新的Stream概念,用于解決集合類庫既有的弊端,Lambda 允許把函數(shù)作為一個方法的參數(shù)(函數(shù)作為參數(shù)傳遞進(jìn)方法中)。使用 Lambda 表達(dá)式可以使代碼變的更加簡潔緊湊2022-04-04
java中synchronized(同步代碼塊和同步方法)詳解及區(qū)別
這篇文章主要介紹了 java中synchronized(同步代碼塊和同步方法)詳解及區(qū)別的相關(guān)資料,需要的朋友可以參考下2017-02-02
Spring Security中的Servlet過濾器體系代碼分析
這篇文章主要介紹了Spring Security中的Servlet過濾器體系,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07

