Redis實(shí)現(xiàn)分布式鎖的示例代碼
什么是分布式鎖
分布式鎖是一種應(yīng)用級(jí)的鎖,在分布式系統(tǒng)下一個(gè)應(yīng)用通常會(huì)有多個(gè)節(jié)點(diǎn)部署,那么在高并發(fā)的情況下,就可能出現(xiàn)同一時(shí)間多個(gè)節(jié)點(diǎn)多個(gè)線程同時(shí)處理同一條數(shù)據(jù),這種情況很容易出現(xiàn)數(shù)據(jù)不一致。那么分布式鎖的出現(xiàn)就是為了解決此問(wèn)題,它能夠保證一個(gè)代碼塊在同一時(shí)間只能被同一線程或同一個(gè)節(jié)點(diǎn)執(zhí)行。
分布式鎖有哪些
- 基于數(shù)據(jù)庫(kù):使用數(shù)據(jù)庫(kù)設(shè)計(jì)排他鎖或者共享鎖來(lái)實(shí)現(xiàn)。
- 基于緩存:由于緩存操作是原子性的,可以使用Redis或Memcached進(jìn)行實(shí)現(xiàn)分布式鎖。
- 基于Zookeeper:利用其臨時(shí)順序節(jié)點(diǎn)特性實(shí)現(xiàn)。
- 基于分布式協(xié)調(diào)服務(wù):使用Etcd或Consul的API實(shí)現(xiàn)。
沒(méi)種分布式鎖的實(shí)現(xiàn)原理也有些不同,今天我們主要基于Redis緩存進(jìn)一步說(shuō)明分布式鎖的實(shí)現(xiàn)。
適用場(chǎng)景
- 數(shù)據(jù)并發(fā)讀寫(xiě):當(dāng)多個(gè)節(jié)點(diǎn)需要對(duì)同一數(shù)據(jù)進(jìn)行操作時(shí),分布式鎖可以確保在任何時(shí)刻只有一個(gè)節(jié)點(diǎn)能夠執(zhí)行寫(xiě)操作,從而避免數(shù)據(jù)的不一致。
- 業(yè)務(wù)流程控制:在復(fù)雜的業(yè)務(wù)流程中,多個(gè)節(jié)點(diǎn)可能需要按特定順序執(zhí)行某些操作。可以使用分布式鎖來(lái)保證這些流程按順序執(zhí)行。
- 資源爭(zhēng)搶:當(dāng)多個(gè)節(jié)點(diǎn)需要爭(zhēng)搶有限的資源時(shí),分布式鎖可以避免資源競(jìng)爭(zhēng)導(dǎo)致的系統(tǒng)壓力增大或資源浪費(fèi)。
實(shí)現(xiàn)原理
使用Redis實(shí)現(xiàn)分布式鎖主要分為以下幾個(gè)步驟:
1.使用Redis的setNx方法,設(shè)置key-value到緩存中,由于Redis操作是原子性的,并且該方法操作若key在緩存中不存在,則會(huì)創(chuàng)建key并設(shè)值后返回狀態(tài)碼1,否則返回狀態(tài)碼0,所以我們只需根據(jù)返回的狀態(tài)碼就可以確定是否有線程正在占用資源,以此作為是否加鎖成功的依據(jù)。
2.使用Redis的delete方法,可以在流程執(zhí)行完成后對(duì)緩存進(jìn)行刪除,以此達(dá)到釋放鎖的目的,從而使其他線程可以加鎖進(jìn)行操作。
3.使用Redis的expire方法,設(shè)置緩存過(guò)期時(shí)間,主要用于防止一些故障導(dǎo)致緩存沒(méi)有釋放,而長(zhǎng)時(shí)間占用鎖從而導(dǎo)致死鎖,同時(shí)也可以為長(zhǎng)時(shí)間的任務(wù)進(jìn)行續(xù)期。
分布式鎖設(shè)計(jì)實(shí)現(xiàn)
這里我們主要通過(guò)SpringBoot已經(jīng)封裝好的RedisTemplate工具類進(jìn)行詳細(xì)講解。
引入maven依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.2.5.RELEASE</version> </dependency>
話不多說(shuō)直接上代碼
public class RedisLock {
private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
/**
* 鎖狀態(tài)
*/
private Boolean locked = false;
/**
* Redis操作工具
*/
private RedisTemplate redisTemplate;
/**
* 緩存過(guò)期時(shí)間(單位:秒)
*/
private Long expireTime = 30L;
/**
* 緩存key
*/
private String key;
public RedisLock(RedisTemplate redisTemplate, String key) {
this.redisTemplate = redisTemplate;
this.key = key;
}
public RedisLock(RedisTemplate redisTemplate, String key, Long expireTime) {
this(redisTemplate, key);
this.expireTime = expireTime;
}
public boolean tryLock(long intervalTimeMs, long timeoutMs) {
// 重試間隔時(shí)間,默認(rèn)100
if (intervalTimeMs <= 0) {
intervalTimeMs = 100L;
}
// 獲取鎖超時(shí)時(shí)間,默認(rèn)100
if (timeoutMs <= intervalTimeMs) {
timeoutMs = 100L;
}
while (timeoutMs >= 0) {
// 設(shè)置緩存,底層調(diào)用setNx方法,返回true即緩存設(shè)置成功,則表示加鎖成功
if (redisTemplate.opsForValue().setIfAbsent(key, "1", expireTime, TimeUnit.SECONDS)) {
locked = true;
return true;
}
// 否則按間隔時(shí)間進(jìn)行休眠后再重新嘗試加鎖
sleep(intervalTimeMs);
timeoutMs -= intervalTimeMs;
}
locked = false;
return false;
}
public void releaseLock() {
if (!locked) {
return;
}
// 僅已加鎖才進(jìn)行刪除
redisTemplate.delete(key);
locked = false;
}
/**
* 延長(zhǎng)過(guò)期時(shí)間
*/
public void renewal() {
redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
}
/**
* 自定義延長(zhǎng)過(guò)期時(shí)間
* @param renewalTime 延長(zhǎng)的時(shí)間(單位:秒)
*/
public void renewal(long renewalTime) {
redisTemplate.expire(key, renewalTime, TimeUnit.SECONDS);
}
private void sleep(long intervalTimeMs) {
try {
TimeUnit.MILLISECONDS.sleep(intervalTimeMs);
} catch (InterruptedException e) {
logger.error("tryLock:{} timeout exception", key);
}
}
}
public class RedisLockBuild {
public static RedisLock build(RedisTemplate redisTemplate, String key) {
RedisLock redisLock = new RedisLock(redisTemplate, key);
return redisLock;
}
public static RedisLock build(RedisTemplate redisTemplate, String key, long expireTime) {
RedisLock redisLock = new RedisLock(redisTemplate, key, expireTime);
return redisLock;
}
}從以上的兩個(gè)代碼類來(lái)看是不是覺(jué)得如此簡(jiǎn)單,主要流程點(diǎn)都有注釋,接下來(lái)我們就仔細(xì)說(shuō)明一下這兩個(gè)類的作用:
RedisLock: Redis分布式鎖工具類,主要提供以下幾個(gè)方法:
public boolean tryLock(long intervalTimeMs, long timeoutMs)
嘗試獲取鎖,并傳遞參數(shù)作為獲取不到鎖時(shí)進(jìn)行等待,以及等待每次重新獲取鎖的時(shí)間,在有效時(shí)間內(nèi)獲取到鎖就返回成功,否則返回失敗,業(yè)務(wù)側(cè)根據(jù)獲取鎖的結(jié)果進(jìn)行業(yè)務(wù)處理。
public void releaseLock()
釋放鎖,通常為了在鎖使用完成后對(duì)其進(jìn)行快速釋放,供其他線程進(jìn)行使用,所以在加鎖的代碼塊都需要使用try-finally進(jìn)行處理,必須主動(dòng)在finally處進(jìn)行鎖釋放。
public void renewal(long renewalTime)
為防止有個(gè)別處理時(shí)間較長(zhǎng)的流程,防止流程未處理完鎖就過(guò)期,提供此方法用于對(duì)鎖進(jìn)行續(xù)期,也就是對(duì)鎖重新設(shè)置過(guò)期時(shí)間,以達(dá)到延遲過(guò)期的效果,使用需要注意釋放鎖防止造成死鎖。
RedisLockBuild: 作為RedisLock的構(gòu)造器,主要方便我們構(gòu)造RedisLock對(duì)象,有需要的可根據(jù)個(gè)人所需進(jìn)行封裝。
使用案例
看似簡(jiǎn)單的東西,不動(dòng)手試一下怎么知道好不好用呢,這里我們就舉個(gè)栗子來(lái)說(shuō)明一下:
@RestController
@RequestMapping("/redis")
public class RedisController {
private static final Logger logger = LoggerFactory.getLogger(RedisController.class);
@Autowired
private RedisTemplate redisTemplate;
private Integer inventoryNumber = 10;
@PostMapping("/lock")
public String lock() {
CompletableFuture cf1 = CompletableFuture.runAsync(() -> increaseInventory(1));
CompletableFuture cf2 = CompletableFuture.runAsync(() -> decreaseInventory(1));
CompletableFuture<Void> cfAll = CompletableFuture.allOf(cf1, cf2);
cfAll.join();
logger.info("Inventory Number:{}", inventoryNumber);
return "test success";
}
/**
* 增加庫(kù)存
*/
@SneakyThrows
public void increaseInventory(int number) {
logger.info("----- increaseInventory start -----");
RedisLock redisLock = RedisLockBuild.build(redisTemplate, "inventory");
try {
if (redisLock.tryLock(500, 3000)) {
inventoryNumber += number;
logger.info("----- increaseInventory success sleep 2 seconds -----");
TimeUnit.SECONDS.sleep(2);
}
else {
logger.info("increaseInventory lock failed.");
}
}
finally {
redisLock.releaseLock();
}
logger.info("----- increaseInventory end -----");
}
/**
* 減少庫(kù)存
*/
@SneakyThrows
public void decreaseInventory(int number) {
logger.info("----- decreaseInventory start -----");
RedisLock redisLock = RedisLockBuild.build(redisTemplate, "inventory");
try {
if (redisLock.tryLock(500, 3000)) {
inventoryNumber -= number;
logger.info("----- decreaseInventory success sleep 2 seconds -----");
TimeUnit.SECONDS.sleep(2);
}
else {
logger.info("decreaseInventory lock failed.");
}
}
finally {
redisLock.releaseLock();
}
logger.info("----- decreaseInventory end -----");
}
}以上代碼我們簡(jiǎn)單寫(xiě)了個(gè)測(cè)試類,設(shè)置一個(gè)庫(kù)存變量inventoryNumber=10,兩個(gè)任務(wù)cf1、cf2,并進(jìn)行異步并發(fā)執(zhí)行,這兩個(gè)任務(wù)同時(shí)對(duì)庫(kù)存變量inventoryNumber進(jìn)行增加和遞減,此時(shí)我們細(xì)看increaseInventory、decreaseInventory這兩個(gè)方法里面的實(shí)現(xiàn),同時(shí)都對(duì)庫(kù)存修改的動(dòng)作進(jìn)行了加分布式鎖操作,方式并發(fā)修改數(shù)據(jù)出現(xiàn)數(shù)據(jù)不一致問(wèn)題。如此我們就成功實(shí)現(xiàn)了分布式鎖。

為了更加可觀,我們?cè)诖a里設(shè)置了休眠2秒的時(shí)間,上圖是我們的一次運(yùn)行結(jié)果,也就是無(wú)論哪個(gè)任務(wù)先獲取到鎖,另一個(gè)任務(wù)就要等休眠2秒過(guò)后釋放后才能夠獲取鎖,從而進(jìn)行庫(kù)存操作。
總結(jié)
使用分布式鎖時(shí)也是需要合理分析場(chǎng)景,并合理設(shè)置好鎖的時(shí)間,以達(dá)到鎖資源的合理分配,從而使我們的系統(tǒng)在高效運(yùn)行的情況下更加安全可靠。
到此這篇關(guān)于Redis實(shí)現(xiàn)分布式鎖的示例代碼的文章就介紹到這了,更多相關(guān)Redis 分布式鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis 對(duì)過(guò)期數(shù)據(jù)的處理方法
這篇文章主要介紹了Redis 對(duì)過(guò)期數(shù)據(jù)的處理,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10
jwt+redis實(shí)現(xiàn)登錄認(rèn)證的示例代碼
在登錄業(yè)務(wù)代碼中,當(dāng)用戶登錄成功時(shí),生成一個(gè)登錄憑證存儲(chǔ)到redis中,本文主要介紹了jwt+redis實(shí)現(xiàn)登錄認(rèn)證的示例代碼,具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03
基于Redis的分布式鎖的簡(jiǎn)單實(shí)現(xiàn)方法
這篇文章主要介紹了基于Redis的分布式鎖的簡(jiǎn)單實(shí)現(xiàn)方法,Redis官方給出兩種思路,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-10-10
Redis集群服務(wù)器的實(shí)現(xiàn)(圖文步驟)
本文介紹了Redis集群服務(wù)器的優(yōu)勢(shì),為讀者提供了全面的Redis集群服務(wù)器知識(shí)和使用技巧,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09

