SpringBoot整合Redisson實現(xiàn)分布式鎖
Redisson是架設在redis基礎上的一個Java駐內存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)。充分的利用了Redis鍵值數(shù)據(jù)庫提供的一系列優(yōu)勢,基于Java實用工具包中常用接口,為使用者提供了一系列具有分布式特性的常用工具類。使得原本作為協(xié)調單機多線程并發(fā)程序的工具包獲得了協(xié)調分布式多機多線程并發(fā)系統(tǒng)的能力,大大降低了設計和研發(fā)大規(guī)模分布式系統(tǒng)的難度。同時結合各富特色的分布式服務,更進一步簡化了分布式環(huán)境中程序相互之間的協(xié)作。
Github地址:https://github.com/redisson/redisson
一、添加依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- springboot整合redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- springboot整合redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>
</dependencies>
二、redis配置文件
server:
port: 8000
spring:
redis:
host: localhost
port: 6379
password: null
database: 1
timeout: 30000
三、新建配置類
@Configuration
public class MyRedissonConfig {
@Value("${spring.redis.host}")
String redisHost;
@Value("${spring.redis.port}")
String redisPort;
@Value("${spring.redis.password}")
String redisPassword;
@Value("${spring.redis.timeout}")
Integer redisTimeout;
/**
* Redisson配置
* @return
*/
@Bean
RedissonClient redissonClient() {
//1、創(chuàng)建配置
Config config = new Config();
redisHost = redisHost.startsWith("redis://") ? redisHost : "redis://" + redisHost;
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(redisHost + ":" + redisPort)
.setTimeout(redisTimeout);
if (StringUtils.isNotBlank(redisPassword)) {
serverConfig.setPassword(redisPassword);
}
return Redisson.create(config);
}
}
//單機
RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("myredisserver:6379");
RedissonClient redisson = Redisson.create(config);
//主從
Config config = new Config();
config.useMasterSlaveServers()
.setMasterAddress("127.0.0.1:6379")
.addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419")
.addSlaveAddress("127.0.0.1:6399");
RedissonClient redisson = Redisson.create(config);
//哨兵
Config config = new Config();
config.useSentinelServers()
.setMasterName("mymaster")
.addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379")
.addSentinelAddress("127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);
//集群
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000) // cluster state scan interval in milliseconds
.addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001")
.addNodeAddress("127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);
四、使用分布式鎖
可重入鎖
基于Redis的Redisson分布式可重入鎖RLock對象實現(xiàn)了java.util.concurrent.locks.Lock接口。
@RequestMapping("/redisson")
public String testRedisson(){
//獲取分布式鎖,只要鎖的名字一樣,就是同一把鎖
RLock lock = redissonClient.getLock("lock");
//加鎖(阻塞等待),默認過期時間是無限期
lock.lock();
try{
//如果業(yè)務執(zhí)行過長,Redisson會自動給鎖續(xù)期
Thread.sleep(1000);
System.out.println("加鎖成功,執(zhí)行業(yè)務邏輯");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//解鎖,如果業(yè)務執(zhí)行完成,就不會繼續(xù)續(xù)期
lock.unlock();
}
return "Hello Redisson!";
}
如果拿到分布式鎖的節(jié)點宕機,且這個鎖正好處于鎖住的狀態(tài)時,會出現(xiàn)鎖死的狀態(tài),為了避免這種情況的發(fā)生,鎖都會設置一個過期時間。這樣也存在一個問題,一個線程拿到了鎖設置了30s超時,在30s后這個線程還沒有執(zhí)行完畢,鎖超時釋放了,就會導致問題,Redisson給出了自己的答案,就是 watch dog 自動延期機制。
Redisson提供了一個監(jiān)控鎖的看門狗,它的作用是在Redisson實例被關閉前,不斷的延長鎖的有效期,也就是說,如果一個拿到鎖的線程一直沒有完成邏輯,那么看門狗會幫助線程不斷的延長鎖超時時間,鎖不會因為超時而被釋放。
默認情況下,看門狗的續(xù)期時間是30s,也可以通過修改Config.lockWatchdogTimeout來另行指定。
另外Redisson 還提供了可以指定leaseTime參數(shù)的加鎖方法來指定加鎖的時間。超過這個時間后鎖便自動解開了,不會延長鎖的有效期。
在RedissonLock類的renewExpiration()方法中,會啟動一個定時任務每隔30/3=10秒給鎖續(xù)期。如果業(yè)務執(zhí)行期間,應用掛了,那么不會自動續(xù)期,到過期時間之后,鎖會自動釋放。
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
另外Redisson還提供了leaseTime的參數(shù)來指定加鎖的時間。超過這個時間后鎖便自動解開了。
// 加鎖以后10秒鐘自動解鎖 // 無需調用unlock方法手動解鎖 lock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
如果指定了鎖的超時時間,底層直接調用lua腳本,進行占鎖。如果超過leaseTime,業(yè)務邏輯還沒有執(zhí)行完成,則直接釋放鎖,所以在指定leaseTime時,要讓leaseTime大于業(yè)務執(zhí)行時間。RedissonLock類的tryLockInnerAsync()方法
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
讀寫鎖
分布式可重入讀寫鎖允許同時有多個讀鎖和一個寫鎖處于加鎖狀態(tài)。在讀寫鎖中,讀讀共享、讀寫互斥、寫寫互斥。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
讀寫鎖測試類,當訪問write接口時,read接口會被阻塞住。
@RestController
public class TestController {
@Autowired
RedissonClient redissonClient;
@Autowired
StringRedisTemplate redisTemplate;
@RequestMapping("/write")
public String write(){
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock");
RLock writeLock = readWriteLock.writeLock();
String s = UUID.randomUUID().toString();
writeLock.lock();
try {
redisTemplate.opsForValue().set("wr-lock-key", s);
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
writeLock.unlock();
}
return s;
}
@RequestMapping("/read")
public String read(){
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock");
RLock readLock = readWriteLock.readLock();
String s = "";
readLock.lock();
try {
s = redisTemplate.opsForValue().get("wr-lock-key");
} finally {
readLock.unlock();
}
return s;
}
}
信號量(Semaphore)
基于Redis的Redisson的分布式信號量(Semaphore)Java對象RSemaphore采用了與java.util.concurrent.Semaphore類似的接口和用法
關于信號量的使用你們能夠想象一下這個場景,有三個停車位,當三個停車位滿了后,其余車就不停了。能夠把車位比做信號,如今有三個信號,停一次車,用掉一個信號,車離開就是釋放一個信號。

咱們用 Redisson 來演示上述停車位的場景。
先定義一個占用停車位的方法:
/**
* 停車,占用停車位
* 總共 3 個車位
*/
@ResponseBody
@RequestMapping("park")
public String park() throws InterruptedException {
// 獲取信號量(停車場)
RSemaphore park = redisson.getSemaphore("park");
// 獲取一個信號(停車位)
park.acquire();
return "OK";
}
再定義一個離開車位的方法:
/**
* 釋放車位
* 總共 3 個車位
*/
@ResponseBody
@RequestMapping("leave")
public String leave() throws InterruptedException {
// 獲取信號量(停車場)
RSemaphore park = redisson.getSemaphore("park");
// 釋放一個信號(停車位)
park.release();
return "OK";
}
為了簡便,我用 Redis 客戶端添加了一個 key:“park”,值等于 3,表明信號量為 park,總共有三個值。

而后用 postman 發(fā)送 park 請求占用一個停車位。

而后在 redis 客戶端查看 park 的值,發(fā)現(xiàn)已經(jīng)改成 2 了。繼續(xù)調用兩次,發(fā)現(xiàn) park 的等于 0,當調用第四次的時候,會發(fā)現(xiàn)請求一直處于等待中,說明車位不夠了。若是想要不阻塞,能夠用 tryAcquire 或 tryAcquireAsync。
咱們再調用離開車位的方法,park 的值變?yōu)榱?1,表明車位剩余 1 個。
注意:屢次執(zhí)行釋放信號量操做,剩余信號量會一直增長,而不是到 3 后就封頂了。
閉鎖(CountDownLatch)
CountDownLatch作用:某一線程,等待其他線程執(zhí)行完畢之后,自己再繼續(xù)執(zhí)行。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他線程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
在TestController中添加測試方法,訪問close接口時,調用await()方法進入阻塞狀態(tài),直到有三次訪問release接口時,close接口才會返回。
@RequestMapping("/close")
public String close() throws InterruptedException {
RCountDownLatch close = redissonClient.getCountDownLatch("close");
close.trySetCount(3);
close.await();
return "close";
}
@RequestMapping("/release")
public String release(){
RCountDownLatch close = redissonClient.getCountDownLatch("close");
close.countDown();
return "release";
}
到此這篇關于SpringBoot整合Redisson實現(xiàn)分布式鎖的文章就介紹到這了,更多相關SpringBoot Redisson分布式鎖內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot中使用@scheduled定時執(zhí)行任務的坑
本文主要介紹了SpringBoot中使用@scheduled定時執(zhí)行任務的坑,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-05-05
Spring中自定義數(shù)據(jù)類型轉換的方法詳解
Spring3引入了一個core.onvert包,提供一個通用類型轉換系統(tǒng)。在Spring容器中,可以使用這個系統(tǒng)作為PropertyEditor實現(xiàn)的替代,將外部化的bean屬性值字符串轉換為所需的屬性類型。本文將詳解這一系統(tǒng)的使用方法,需要的可以參考一下2022-06-06
selenium + ChromeDriver安裝及使用方法
這篇文章主要介紹了selenium + ChromeDriver安裝及使用方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-06-06

