MySql實現(xiàn)分布式鎖的示例代碼
本篇我們使用mysql實現(xiàn)一個分布式鎖。
環(huán)境:mysql8,navicat,maven,springboot2.3.11,mybatis-plus
分布式鎖的功能
1,分布式鎖使用者位于不同的機(jī)器中,鎖獲取成功之后,才可以對共享資源進(jìn)行操作
2,鎖具有重入的功能:即一個使用者可以多次獲取某個鎖
3,獲取鎖有超時的功能:即在指定的時間內(nèi)去嘗試獲取鎖,超過了超時時間,如果還未獲取成功,則返回獲取失敗
4,能夠自動容錯,比如:A機(jī)器獲取鎖lock1之后,在釋放鎖lock1之前,A機(jī)器掛了,導(dǎo)致鎖lock1未釋放,結(jié)果會lock1一直被A機(jī)器占有著,遇到這種情況時,分布式鎖要能夠自動解決,可以這么做:持有鎖的時候可以加個持有超時時間,超過了這個時間還未釋放的,其他機(jī)器將有機(jī)會獲取鎖
預(yù)備技能:樂觀鎖
通常我們修改表中一條數(shù)據(jù)過程如下:
t1:select獲取記錄R1 t2:對R1進(jìn)行編輯 t3:update R1
我們來看一下上面的過程存在的問題:
如果A、B兩個線程同時執(zhí)行到t1,他們倆看到的R1的數(shù)據(jù)一樣,然后都對R1進(jìn)行編輯,然后去執(zhí)行t3,最終2個線程都會更新成功,后面一個線程會把前面一個線程update的結(jié)果給覆蓋掉,這就是并發(fā)修改數(shù)據(jù)存在的問題。
我們可以在表中新增一個版本號,每次更新數(shù)據(jù)時候?qū)姹咎栕鳛闂l件,并且每次更新時候版本號+1,過程優(yōu)化一下,如下:
t1:打開事務(wù)start transaction
t2:select獲取記錄R1,聲明變量v=R1.version
t3:對R1進(jìn)行編輯
t4:執(zhí)行更新操作
update R1 set version = version + 1 where user_id=#user_id# and version = #v#;
t5:t4中的update會返回影響的行數(shù),我們將其記錄在count中,然后根據(jù)count來判斷提交還是回滾
if(count==1){
//提交事務(wù)
commit;
}else{
//回滾事務(wù)
rollback;
}
上面重點在于步驟t4,當(dāng)多個線程同時執(zhí)行到t1,他們看到的R1是一樣的,但是當(dāng)他們執(zhí)行到t4的時候,數(shù)據(jù)庫會對update的這行記錄加鎖,確保并發(fā)情況下排隊執(zhí)行,所以只有第一個的update會返回1,其他的update結(jié)果會返回0,然后后面會判斷count是否為1,進(jìn)而對事務(wù)進(jìn)行提交或者回滾??梢酝ㄟ^count的值知道修改數(shù)據(jù)是否成功了。
上面這種方式就樂觀鎖。我們可以通過樂觀鎖的方式確保數(shù)據(jù)并發(fā)修改過程中的正確性。
使用mysql實現(xiàn)分布式鎖
我們創(chuàng)建一個分布式鎖表,如下
DROP TABLE IF EXISTS t_lock; create table t_lock( lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '鎖唯一標(biāo)志', request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用來標(biāo)識請求對象的', lock_count INT NOT NULL DEFAULT 0 COMMENT '當(dāng)前上鎖次數(shù)', timeout BIGINT NOT NULL DEFAULT 0 COMMENT '鎖超時時間', version INT NOT NULL DEFAULT 0 COMMENT '版本號,每次更新+1' )COMMENT '鎖信息表';
java代碼如下
mapper接口
package com.shiguiwu.springmybatis.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import org.springframework.stereotype.Repository;
/**
* @description: 鎖mapper
* @author: stone
* @date: Created by 2021/5/30 11:12
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.mapper
*/
@Repository
public interface LockMapper extends BaseMapper<LockModel> {
}
鎖對象model
package com.shiguiwu.springmybatis.lock.model;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.annotation.Version;
import lombok.Data;
/**
* @description: 鎖模型
* @author: stone
* @date: Created by 2021/9/10 11:13
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.lock.model
*/
@Data
@TableName("t_lock")
public class LockModel {
/**
* 鎖的唯一值
*/
@TableId
private String lockKey;
/**
* 請求id,同一個線程里請求id一樣
*/
private String requestId;
//鎖次數(shù)
private Integer lockCount;
//鎖超時
private Long timeout;
//樂觀鎖版本
@Version
private Integer version;
}
鎖接口
package com.shiguiwu.springmybatis.lock;
/**
* @description: 鎖接口
* @author: stone
* @date: Created by 2021/9/10 11:40
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.lock
*/
public interface ILock<T> {
/**
* 獲取分布式鎖,支持重入
* @param lockKey 鎖可以
* @param lockTimeout 持有鎖的有效時間,防止死鎖
* @param getTimeout 獲取鎖超時時間,
* @return 是否鎖成功
*/
public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception;
/**
* 解鎖
* @param lockKey 鎖key
*
*/
public void unlock(String lockKey);
/**
* 重置鎖對象
* @param t 鎖對象
* @return 返回鎖記錄
*/
public int restLock(T t);
}
鎖的實現(xiàn)代碼如下
package com.shiguiwu.springmybatis.lock;
import cn.hutool.core.util.StrUtil;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import com.shiguiwu.springmybatis.mapper.LockMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @description: mysql實現(xiàn)分布式鎖
* @author: stone
* @date: Created by 2021/9/10 11:09
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.lock
*/
@Component
@Slf4j
public class MysqlLock implements ILock<LockModel>{
static ThreadLocal<String> requestIds = new ThreadLocal<>();
@Autowired
private LockMapper lockMapper;
public String getRequestId() {
String requestId = requestIds.get();
if (StrUtil.isBlank(requestId)) {
requestId = UUID.randomUUID().toString();
requestIds.set(requestId);
}
log.info("獲取到的requestId===> {}", requestId);
return requestId;
}
/**
* 獲取鎖
* @param lockKey 鎖可以
* @param lockTimeout 持有鎖的有效時間,防止死鎖
* @param getTimeout 獲取鎖超時時間,
* @return
*/
@Override
public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception {
log.info(" lock start =======================> {}",lockKey);
//從local中獲取 請求id
String requestId = this.getRequestId();
//獲取鎖的結(jié)果
boolean lockResult = false;
//開始時間
long startTime = System.currentTimeMillis();
while (true) {
LockModel lockModel = lockMapper.selectById(lockKey);
if (Objects.nonNull(lockModel)) {
//獲取鎖對象的請求id
String reqId = lockModel.getRequestId();
//如果是空,表示改鎖未被占有
if (StrUtil.isBlank(reqId)) {
//馬上占有它
//設(shè)置請求id
lockModel.setRequestId(requestId);
//設(shè)置鎖次數(shù)
lockModel.setLockCount(1);
//設(shè)置超時時間,防止死鎖
lockModel.setTimeout(System.currentTimeMillis() + lockTimeout);
if (lockMapper.updateById(lockModel) == 1) {
lockResult = true;
break;
}
}
//如果request_id和表中request_id一樣表示鎖被當(dāng)前線程持有者,此時需要加重入鎖
else if (requestId.equals(reqId)) {
//可重入鎖
lockModel.setTimeout(System.currentTimeMillis() + lockTimeout);
//設(shè)置獲取初次
lockModel.setLockCount(lockModel.getLockCount() + 1);
if (lockMapper.updateById(lockModel) == 1) {
lockResult = true;
break;
}
}
//不為空,也不相等,說明是其他線程占有
else {
//鎖不是自己的,并且已經(jīng)超時了,則重置鎖,繼續(xù)重試
if (lockModel.getTimeout() < System.currentTimeMillis()) {
//未超時,繼續(xù)重試
this.restLock(lockModel);
}
//如果未超時,休眠100毫秒,繼續(xù)重試
else {
if (startTime + getTimeout > System.currentTimeMillis()) {
TimeUnit.MILLISECONDS.sleep(100);
}
else {
//防止長時間阻塞
break;
}
}
}
}
//如果是空,就插入一個鎖,重新嘗試獲取鎖
else {
lockModel = new LockModel();
//設(shè)置鎖key
lockModel.setLockKey(lockKey);
lockMapper.insert(lockModel);
}
}
log.info(" lock end =======================> {}",lockKey);
return lockResult;
}
/**
* 釋放鎖
* @param lockKey 鎖key
*/
@Override
public void unlock(String lockKey) {
LockModel lockModel = lockMapper.selectById(lockKey);
//獲取當(dāng)前線程的請求id
String reqId = this.getRequestId();
//獲取鎖次數(shù)
int count = 0;
//當(dāng)前線程requestId和庫中request_id一致 && lock_count>0,表示可以釋放鎖
if (Objects.nonNull(lockModel)
&& reqId.equals(lockModel.getRequestId())
&& (count = lockModel.getLockCount()) > 0) {
if (count == 1) {
//重置鎖
this.restLock(lockModel);
}
//重入鎖的問題,鎖的次數(shù)減一
else {
lockModel.setLockCount(lockModel.getLockCount() - 1);
//更新次數(shù)
lockMapper.updateById(lockModel);
}
}
}
/**
* 重置鎖
* @param lockModel 鎖對象
* @return 更新條數(shù)
*/
@Override
public int restLock(LockModel lockModel) {
lockModel.setLockCount(0);
lockModel.setRequestId("");
lockModel.setTimeout(0L);
return lockMapper.updateById(lockModel);
}
}
上面代碼中實現(xiàn)了文章開頭列的分布式鎖的所有功能,大家可以認(rèn)真研究下獲取鎖的方法:lock,釋放鎖的方法:unlock。
測試用例
package com.shiguiwu.springmybatis;
import com.shiguiwu.springmybatis.lock.ILock;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @description: 鎖測試
* @author: stone
* @date: Created by 2021/9/10 15:32
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis
*/
@SpringBootTest
@Slf4j
public class LockApplicationTests {
@Autowired
private ILock<LockModel> mysqlLock;
測試重復(fù)獲取和重復(fù)釋放
@Test
public void testRepeat() throws Exception {
for (int i = 0; i < 10; i++) {
mysqlLock.lock("key1", 10000L, 1000);
}
for (int i = 0; i < 10; i++) {
mysqlLock.unlock("key1");
}
}
// //獲取之后不釋放,超時之后被thread1獲取
@Test
public void testTimeout() throws Exception {
String lockKey = "key2";
mysqlLock.lock(lockKey, 5000L, 1000);
Thread thread1 = new Thread(() -> {
try {
mysqlLock.lock(lockKey, 5000L, 7000);
} catch (Exception e) {
e.printStackTrace();
} finally {
mysqlLock.unlock(lockKey);
}
}, "thread1");
thread1.start();
thread1.join();
}
}
test1方法測試了重入鎖的效果。
test2測試了主線程獲取鎖之后一直未釋放,持有鎖超時之后被thread1獲取到了
留給大家一個問題
上面分布式鎖還需要考慮一個問題:比如A機(jī)會獲取了key1的鎖,并設(shè)置持有鎖的超時時間為10秒,但是獲取鎖之后,執(zhí)行了一段業(yè)務(wù)操作,業(yè)務(wù)操作耗時超過10秒了,此時機(jī)器B去獲取鎖時可以獲取成功的,此時會導(dǎo)致A、B兩個機(jī)器都獲取鎖成功了,都在執(zhí)行業(yè)務(wù)操作,這種情況應(yīng)該怎么處理?大家可以思考一下然后留言,我們一起討論一下。
到此這篇關(guān)于MySql實現(xiàn)分布式鎖的示例代碼的文章就介紹到這了,更多相關(guān)MySql 分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Ubuntu查看修改mysql的登錄名和密碼、安裝phpmyadmin
這篇文章主要介紹了Ubuntu查看修改mysql的登錄名和密碼、安裝phpmyadmin,本文分步驟給大家講解的非常詳細(xì),需要的朋友可以參考下2019-11-11
Mysql 常用的時間日期及轉(zhuǎn)換函數(shù)小結(jié)
本文是腳本之家小編給大家總結(jié)的一些常用的mysql時間日期以及轉(zhuǎn)換函數(shù),非常不錯,具有一定的參考借鑒價值,需要的朋友參考下吧2018-05-05
MySQL數(shù)據(jù)定義語言DDL的基礎(chǔ)語句
這篇文章主要介紹了MySQL數(shù)據(jù)定義語言DDL的基礎(chǔ)語句,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08

