Java分布式鎖幾種常見(jiàn)的實(shí)現(xiàn)方式
前言
分布式鎖主要用于解決在分布式系統(tǒng)中多個(gè)節(jié)點(diǎn)對(duì)共享資源進(jìn)行并發(fā)訪問(wèn)時(shí)可能出現(xiàn)的競(jìng)爭(zhēng)問(wèn)題。
在Java中實(shí)現(xiàn)分布式鎖的方式主要有以下幾種:
基于數(shù)據(jù)庫(kù)的實(shí)現(xiàn)
(1)唯一約束
通過(guò)數(shù)據(jù)庫(kù)表中設(shè)置唯一鍵約束來(lái)保證只有一個(gè)客戶(hù)端可以獲取到鎖。通常會(huì)有一張專(zhuān)門(mén)的鎖表,包含鎖名稱(chēng)和鎖的持有者信息等字段。
CREATE TABLE distributed_locks (
lock_name VARCHAR(255) PRIMARY KEY,
owner_info VARCHAR(255) NOT NULL
);該代碼示例展示了如何使用上述表來(lái)獲取和釋放鎖,并執(zhí)行相應(yīng)的業(yè)務(wù)邏輯(例如更新庫(kù)存),在請(qǐng)求A對(duì)業(yè)務(wù)進(jìn)行操作的時(shí)候,假設(shè)請(qǐng)求B也進(jìn)入到此方法,則會(huì)由于鎖表的唯一索引lock_name而導(dǎo)致插入失敗,導(dǎo)致其操作被拒絕,而主鍵則需要針對(duì)不同業(yè)務(wù)場(chǎng)景設(shè)置,不同業(yè)務(wù)場(chǎng)景不會(huì)觸發(fā)鎖機(jī)制。
public void operateStock(Integer num) {
String lockName = "product_stock_update_lock"; // 鎖名稱(chēng),如:業(yè)務(wù)編碼
String ownerId = "d2d00005sa5s512"; // 當(dāng)前實(shí)例標(biāo)識(shí)符,如:用戶(hù)id
// SQL == "INSERT INTO distributed_locks(lock_name, owner_info) VALUES (?, ?)"
// 加鎖插入成功返回true
boolean gotLock = acquireLock(lockName, ownerId);
if (gotLock) {
try {
// 執(zhí)行業(yè)務(wù)邏輯,如更新指定物料庫(kù)存
//SQL == "UPDATE products SET stock = stock - #{num} WHERE product_id = '001' AND stock > 0";
updateProductStock(num);
} catch (Exception e) {
e.printStackTrace();
} finally {
// SQL == "DELETE FROM distributed_locks WHERE lock_name = ? AND owner_info = ?"
releaseLock(lockName, ownerId);
}
} else {
System.out.println("Failed to acquire lock.");
}
}(2)行鎖或表鎖
在查詢(xún)語(yǔ)句后面增加for update,數(shù)據(jù)庫(kù)會(huì)在查詢(xún)過(guò)程中給數(shù)據(jù)庫(kù)表增加排他鎖,當(dāng)某條記錄被加上排他鎖之后,其他線程無(wú)法再在該行記錄上增加排他鎖,而事務(wù)提交后會(huì)自動(dòng)釋放鎖。
@Autowired
private ProductMapper productMapper;
@Transactional
public void updateProductStock(String productId) {
//查詢(xún)并加鎖 @Select("SELECT * FROM products WHERE product_id = #{productId} FOR UPDATE")
Product product = productMapper.selectForUpdate(productId);
if (product == null || product.getStock() <= 0) {
System.out.println("庫(kù)存不足或產(chǎn)品不存在");
return;
}
// 更新庫(kù)存 @Update("UPDATE products SET stock = stock - 1 WHERE product_id = #{productId}")
int rowsAffected = productMapper.updateStock(productId);
if (rowsAffected > 0) {
System.out.println("庫(kù)存更新成功");
} else {
System.out.println("未能成功更新庫(kù)存");
}
}(3)version樂(lè)觀鎖
樂(lè)觀鎖是一種處理并發(fā)控制的策略,它假設(shè)數(shù)據(jù)沖突的概率較低,因此不會(huì)在讀取數(shù)據(jù)時(shí)加鎖。相反,它會(huì)在更新數(shù)據(jù)時(shí)檢查數(shù)據(jù)是否被其他事務(wù)修改過(guò)。這通常通過(guò)一個(gè)版本號(hào)(version)字段或時(shí)間戳來(lái)實(shí)現(xiàn)。
讀取數(shù)據(jù):當(dāng)一個(gè)事務(wù)讀取數(shù)據(jù)時(shí),同時(shí)獲取該記錄的版本號(hào)或時(shí)間戳。
修改數(shù)據(jù):當(dāng)事務(wù)嘗試更新數(shù)據(jù)時(shí),它會(huì)使用版本號(hào)作為條件的一部分進(jìn)行更新操作。
CREATE TABLE products (
product_id VARCHAR(255) PRIMARY KEY,
stock INT NOT NULL,
version INT DEFAULT 0
); @Autowired
private ProductMapper productMapper;
@Transactional
public void updateProductStock(String productId) {
// 查詢(xún)并加鎖
// @Select("SELECT product_id,stock,version FROM products WHERE product_id = #{productId} FOR UPDATE")
Product product = productMapper.selectForUpdate(productId);
if (product == null || product.getStock() <= 0) {
System.out.println("庫(kù)存不足或產(chǎn)品不存在");
return;
}
// 嘗試更新庫(kù)存,并檢查版本號(hào)
// @Update("UPDATE products SET stock = stock - 1, version = version + 1 WHERE product_id = #{productId} AND version = #{version}")
int rowsAffected = productMapper.updateStockWithVersion(productId, product.getVersion());
if (rowsAffected > 0) {
System.out.println("庫(kù)存更新成功");
} else {
System.out.println("庫(kù)存更新失敗,可能已被其他事務(wù)更新");
// 這里可以根據(jù)業(yè)務(wù)需求選擇重試或者拋出異常等處理方式
}
}基于Redis的實(shí)現(xiàn)
使用Redis實(shí)現(xiàn)分布式鎖是一種高效且廣泛采用的方法,特別適合于需要高吞吐量和低延遲的場(chǎng)景。Redis通過(guò)其原子操作命令提供了一種簡(jiǎn)單而有效的機(jī)制來(lái)實(shí)現(xiàn)分布式鎖。
SET resource_name my_random_value NX PX 30000
NX 表示僅在鍵不存在時(shí)設(shè)置鍵。
PX 30000 設(shè)置鍵的過(guò)期時(shí)間為30秒,防止死鎖(如果客戶(hù)端崩潰或網(wǎng)絡(luò)問(wèn)題導(dǎo)致無(wú)法釋放鎖)。
RedisTemplate實(shí)現(xiàn)分布式鎖
編寫(xiě)工具類(lèi)
@Component
public class MyRedisLock {
private final RedisTemplate<String, String> redisTemplate;
@Autowired
// 自Spring 4.3起,如果只有一個(gè)構(gòu)造函數(shù),可以省略@Autowired注解
public MyRedisLock(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
// Lua 腳本用于釋放鎖
private static final String UNLOCK_SCRIPT =
"if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end";
// Lua 腳本用于續(xù)期鎖
private static final String RENEW_LOCK_SCRIPT =
"if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('PEXPIRE', KEYS[1], ARGV[2]) else return 0 end";
// 鎖的前綴,用于區(qū)分不同的鎖
private static final String LOCK_PREFIX = "lock:";
// 續(xù)期鎖的時(shí)間間隔(毫秒)
private static final long RENEW_INTERVAL_MS = 2000;
/**
嘗試獲取鎖
@param lockKey 鎖的鍵名
@param expireMs 鎖的過(guò)期時(shí)間(毫秒)
@param operateId 鎖的值
@return 如果成功獲取鎖,返回 true;否則返回 false
**/
public boolean tryLock(String lockKey, long expireMs, String operateId) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockKey, operateId, expireMs, TimeUnit.MILLISECONDS); // setIfAbsent實(shí)現(xiàn)上鎖
return result != null && result;
}
/**
嘗試獲取鎖并自動(dòng)續(xù)期
@param lockKey 鎖的鍵名
@param expireMs 鎖的過(guò)期時(shí)間(毫秒)
@param operateId 鎖的值
@return 如果成功獲取鎖,返回鎖的唯一標(biāo)識(shí)符;否則返回 null
**/
public boolean tryLockWithRenewal(String lockKey, long expireMs, String operateId) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockKey, operateId, expireMs, TimeUnit.MILLISECONDS); // setIfAbsent實(shí)現(xiàn)上鎖
if (result != null && result) {
// 啟動(dòng)續(xù)期線程
startRenewalThread(lockKey, operateId, expireMs);
return true;
}
return false;
}
/**
釋放鎖
@param lockKey 鎖的鍵名
@param operateId 鎖的值(用于驗(yàn)證是否是持有鎖的客戶(hù)端)
@return 如果成功釋放鎖,返回 true;否則返回 false
**/
public boolean unlock(String lockKey, String operateId) {
DefaultRedisScript<Long> script = new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class);
// 執(zhí)行l(wèi)ua腳本,參數(shù)解釋下:
// 第一個(gè)參數(shù)script為lua腳本
// 第二個(gè)參數(shù)為key的集合,會(huì)依次替換lua腳本中的KEYS[]數(shù)組的數(shù)據(jù),默認(rèn)1開(kāi)始
// 第三個(gè)參數(shù)為參數(shù)集合,會(huì)依次替換lua腳本中的ARGVS[]數(shù)組的數(shù)據(jù),默認(rèn)1開(kāi)始
Long result = redisTemplate.execute(script, Collections.singletonList(LOCK_PREFIX + lockKey), operateId);
return result != null && result == 1L;
}
/**
自動(dòng)續(xù)期鎖
@param lockKey 鎖的鍵名
@param operateId 鎖的值(用于驗(yàn)證是否是持有鎖的客戶(hù)端)
@param expireMs 鎖的過(guò)期時(shí)間(毫秒)
@return 如果成功續(xù)期,返回 true;否則返回 false
**/
public boolean renewLock(String lockKey, String operateId, long expireMs) {
DefaultRedisScript<Long> script = new DefaultRedisScript<>(RENEW_LOCK_SCRIPT, Long.class);
Long result = redisTemplate.execute(script, Collections.singletonList(LOCK_PREFIX + lockKey), operateId, String.valueOf(expireMs));
return result != null && result == 1L;
}
/**
啟動(dòng)續(xù)期線程
@param lockKey 鎖的鍵名
@param operateId 鎖的值
@param expireMs 鎖的過(guò)期時(shí)間(毫秒)
**/
private void startRenewalThread(final String lockKey, final String operateId, final long expireMs) {
Thread renewalThread = new Thread(() -> {
try {
while (true) {
// 每隔一段時(shí)間續(xù)期一次,需要確保間隔時(shí)間小于過(guò)期時(shí)間,過(guò)期或釋放鎖將無(wú)法續(xù)費(fèi)
Thread.sleep(RENEW_INTERVAL_MS);
if (!renewLock(lockKey, operateId, expireMs)) { // 續(xù)鎖操作
// 如果續(xù)期失敗,直接結(jié)束守護(hù)線程,停止鎖續(xù)期行為。
// 這里說(shuō)明下,刪除鎖和續(xù)鎖都需要驗(yàn)證lockValue,這個(gè)上鎖時(shí)通過(guò)uuid創(chuàng)建的,其他線程肯定獲取的都不一致,這樣確保續(xù)鎖行為只能是自己的守護(hù)線程才可以操作;如果續(xù)鎖失敗了,則說(shuō)明是主線程完成任務(wù)刪除了key鎖,所以這里守護(hù)線程也可以結(jié)束了
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
renewalThread.setDaemon(true); // 設(shè)置為守護(hù)線程
renewalThread.start();
}
}代碼示例:常規(guī)鎖
@Autowired
private MyRedisLock redisLock;
// 鎖的默認(rèn)過(guò)期時(shí)間(毫秒)
private static final long DEFAULT_EXPIRE_TIME_MS = 5000;
public void testLock() throws InterruptedException {
String lockKey = "my_distributed_lock";
//進(jìn)程標(biāo)識(shí)ID
String operateId = java.util.UUID.randomUUID().toString();
if (redisLock.tryLock(lockKey, DEFAULT_EXPIRE_TIME_MS, operateId)) {
System.out.println(" 獲取到鎖,開(kāi)始執(zhí)行任務(wù)...");
try {
// 執(zhí)行業(yè)務(wù)邏輯
Thread.sleep(5000); // 模擬耗時(shí)操作
System.out.println(Thread.currentThread().getName() + " 任務(wù)執(zhí)行完成");
} finally {
if (redisLock.unlock(lockKey, operateId)) {
System.out.println(Thread.currentThread().getName() + " 成功釋放鎖");
} else {
System.out.println(Thread.currentThread().getName() + " 釋放鎖失敗,鎖可能已被其他客戶(hù)端刪除");
}
}
} else {
System.out.println(Thread.currentThread().getName() + " 未能獲取到鎖");
}
}代碼示例:續(xù)費(fèi)線程鎖
@Autowired
private MyRedisLock redisLock;
// 鎖的默認(rèn)過(guò)期時(shí)間(毫秒)
private static final long DEFAULT_EXPIRE_TIME_MS = 5000;
public void testLock() throws InterruptedException {
String lockKey = "my_distributed_lock";
//進(jìn)程標(biāo)識(shí)ID
String operateId = java.util.UUID.randomUUID().toString();
// 嘗試獲取鎖并自動(dòng)續(xù)期
if ( redisLock.tryLockWithRenewal(lockKey, DEFAULT_EXPIRE_TIME_MS, operateId)) {
try {
// 執(zhí)行業(yè)務(wù)邏輯
Thread.sleep(8000);
System.out.println(Thread.currentThread().getName() + " 任務(wù)執(zhí)行完成");
} finally {
// 釋放鎖
boolean unlockSuccess = redisLock.unlock(lockKey, operateId);
if (unlockSuccess) {
System.out.println(Thread.currentThread().getName() + " 成功釋放鎖");
} else {
System.out.println(Thread.currentThread().getName() + " 釋放鎖失敗,鎖可能已被其他客戶(hù)端刪除");
}
}
} else {
System.out.println(Thread.currentThread().getName() + " 未能獲取到鎖");
}
}基于Zookeeper的實(shí)現(xiàn)
基本原理
- 創(chuàng)建臨時(shí)順序節(jié)點(diǎn):客戶(hù)端嘗試在特定路徑下創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn)(例如/locks/lock-),以表示對(duì)鎖的競(jìng)爭(zhēng)。
- 判斷是否獲得鎖:檢查自己創(chuàng)建的節(jié)點(diǎn)是否是該路徑下所有子節(jié)點(diǎn)中最小的一個(gè)。如果是,則表示獲得了鎖;如果不是,則監(jiān)聽(tīng)前一個(gè)節(jié)點(diǎn)(即比自己小的那個(gè)節(jié)點(diǎn))的變化。
- 監(jiān)聽(tīng)前一節(jié)點(diǎn)刪除事件:如果當(dāng)前節(jié)點(diǎn)不是最小節(jié)點(diǎn),則需要等待直到前一節(jié)點(diǎn)被刪除(意味著前一客戶(hù)端釋放了鎖),然后重新檢查是否可以獲得鎖。
- 釋放鎖:當(dāng)業(yè)務(wù)邏輯執(zhí)行完畢后,客戶(hù)端可以主動(dòng)刪除自己創(chuàng)建的節(jié)點(diǎn)來(lái)釋放鎖。此外,由于使用的是臨時(shí)節(jié)點(diǎn),如果客戶(hù)端崩潰或與ZooKeeper斷開(kāi)連接,該節(jié)點(diǎn)也會(huì)自動(dòng)被刪除。
引入依賴(lài)(一個(gè)用于簡(jiǎn)化ZooKeeper操作的框架)
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>代碼示例
private static final String ZK_ADDRESS = "localhost:2181";
private static final String LOCK_PATH = "/distributed_lock_example";
public void operate() throws Exception {
// 創(chuàng)建CuratorFramework實(shí)例
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
client.start();
// 使用InterProcessMutex作為分布式鎖
InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH);
try {
// 獲取鎖
lock.acquire();
//業(yè)務(wù)操作代碼
this.performBusinessLogic();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock.isAcquiredInThisProcess()) {
try {
// 釋放鎖
lock.release();
System.out.println("Lock released.");
} catch (Exception e) {
e.printStackTrace();
}
}
// 關(guān)閉客戶(hù)端
client.close();
}
}
private static void performBusinessLogic() throws InterruptedException {
// 模擬業(yè)務(wù)邏輯處理
System.out.println("Performing some operations...");
Thread.sleep(5000); // 暫停5秒模擬長(zhǎng)時(shí)間操作
}基于etcd的實(shí)現(xiàn)(僅了解)
類(lèi)似于Zookeeper,etcd也提供了類(lèi)似的分布式協(xié)調(diào)服務(wù),可以通過(guò)創(chuàng)建租約(lease)并附加到關(guān)鍵路徑上來(lái)實(shí)現(xiàn)分布式鎖。Etcd支持事務(wù)、watch機(jī)制等功能,使得它同樣適用于構(gòu)建分布式鎖。
總結(jié)
到此這篇關(guān)于Java分布式鎖幾種常見(jiàn)的實(shí)現(xiàn)方式的文章就介紹到這了,更多相關(guān)Java分布式鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
mybatis 插件: 打印 sql 及其執(zhí)行時(shí)間實(shí)現(xiàn)方法
下面小編就為大家?guī)?lái)一篇mybatis 插件: 打印 sql 及其執(zhí)行時(shí)間實(shí)現(xiàn)方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-06-06
教你用IDEA配置JUnit并進(jìn)行單元測(cè)試
今天教各位小伙伴怎么用IDEA配置JUnit并進(jìn)行單元測(cè)試,文中有非常詳細(xì)的圖文介紹及代碼示例,對(duì)正在學(xué)習(xí)IDEA的小伙伴有很好的幫助,需要的朋友可以參考下2021-05-05
Spring中@Autowired注解在不同方法的寫(xiě)法示例
這篇文章主要為大家介紹了Spring中@Autowired注解在不同方法的寫(xiě)法示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01
Springboot配置管理Externalized?Configuration深入探究
這篇文章主要介紹了Springboot配置管Externalized?Configuration深入探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
淺談Java實(shí)體對(duì)象的三種狀態(tài)以及轉(zhuǎn)換關(guān)系
這篇文章主要介紹了淺談Java實(shí)體對(duì)象的三種狀態(tài)以及轉(zhuǎn)換關(guān)系,具有一定參考價(jià)值,需要的朋友可以,看看。。2017-11-11
spring Retryable注解實(shí)現(xiàn)重試詳解
這篇文章主要介紹了spring Retryable注解實(shí)現(xiàn)重試詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-09-09

