zookeeper實現(xiàn)分布式鎖
一、分布式鎖介紹
分布式鎖主要用于在分布式環(huán)境中保護(hù)跨進(jìn)程、跨主機(jī)、跨網(wǎng)絡(luò)的共享資源實現(xiàn)互斥訪問,以達(dá)到保證數(shù)據(jù)的一致性。
二、架構(gòu)介紹
在介紹使用Zookeeper實現(xiàn)分布式鎖之前,首先看當(dāng)前的系統(tǒng)架構(gòu)圖
解釋: 左邊的整個區(qū)域表示一個Zookeeper集群,locker是Zookeeper的一個持久節(jié)點,node_1、node_2、node_3是locker這個持久節(jié)點下面的臨時順序節(jié)點。client_1、client_2、client_n表示多個客戶端,Service表示需要互斥訪問的共享資源。
三、分布式鎖獲取思路
1.獲取分布式鎖的總體思路
在獲取分布式鎖的時候在locker節(jié)點下創(chuàng)建臨時順序節(jié)點,釋放鎖的時候刪除該臨時節(jié)點。客戶端調(diào)用createNode方法在locker下創(chuàng)建臨時順序節(jié)點,然后調(diào)用getChildren(“l(fā)ocker”)來獲取locker下面的所有子節(jié)點,注意此時不用設(shè)置任何Watcher??蛻舳双@取到所有的子節(jié)點path之后,如果發(fā)現(xiàn)自己在之前創(chuàng)建的子節(jié)點序號最小,那么就認(rèn)為該客戶端獲取到了鎖。如果發(fā)現(xiàn)自己創(chuàng)建的節(jié)點并非locker所有子節(jié)點中最小的,說明自己還沒有獲取到鎖,此時客戶端需要找到比自己小的那個節(jié)點,然后對其調(diào)用exist()方法,同時對其注冊事件監(jiān)聽器。之后,讓這個被關(guān)注的節(jié)點刪除,則客戶端的Watcher會收到相應(yīng)通知,此時再次判斷自己創(chuàng)建的節(jié)點是否是locker子節(jié)點中序號最小的,如皋是則獲取到了鎖,如果不是則重復(fù)以上步驟繼續(xù)獲取到比自己小的一個節(jié)點并注冊監(jiān)聽。當(dāng)前這個過程中還需要許多的邏輯判斷。
2.獲取分布式鎖的核心算法流程
下面同個一個流程圖來分析獲取分布式鎖的完整算法,如下:

解釋:客戶端A要獲取分布式鎖的時候首先到locker下創(chuàng)建一個臨時順序節(jié)點(node_n),然后立即獲取locker下的所有(一級)子節(jié)點。
此時因為會有多個客戶端同一時間爭取鎖,因此locker下的子節(jié)點數(shù)量就會大于1。對于順序節(jié)點,特點是節(jié)點名稱后面自動有一個數(shù)字編號,先創(chuàng)建的節(jié)點數(shù)字編號小于后創(chuàng)建的,因此可以將子節(jié)點按照節(jié)點名稱后綴的數(shù)字順序從小到大排序,這樣排在第一位的就是最先創(chuàng)建的順序節(jié)點,此時它就代表了最先爭取到鎖的客戶端!此時判斷最小的這個節(jié)點是否為客戶端A之前創(chuàng)建出來的node_n,如果是則表示客戶端A獲取到了鎖,如果不是則表示鎖已經(jīng)被其它客戶端獲取,因此客戶端A要等待它釋放鎖,也就是等待獲取到鎖的那個客戶端B把自己創(chuàng)建的那個節(jié)點刪除。
此時就通過監(jiān)聽比node_n次小的那個順序節(jié)點的刪除事件來知道客戶端B是否已經(jīng)釋放了鎖,如果是,此時客戶端A再次獲取locker下的所有子節(jié)點,再次與自己創(chuàng)建的node_n節(jié)點對比,直到自己創(chuàng)建的node_n是locker的所有子節(jié)點中順序號最小的,此時表示客戶端A獲取到了鎖!
四、基于Zookeeper的分布式鎖的代碼實現(xiàn)
1.定義分布式鎖接口
定義的分布式鎖接口如下:
public interface DistributedLock {
/**獲取鎖,如果沒有得到就等待*/
public void acquire() throws Exception;
/**
* 獲取鎖,直到超時
* @param time超時時間
* @param unit time參數(shù)的單位
* @return是否獲取到鎖
* @throws Exception
*/
public boolean acquire (long time, TimeUnit unit) throws Exception;
/**
* 釋放鎖
* @throws Exception
*/
public void release() throws Exception;
}
2.定義一個簡單的互斥鎖
定義一個互斥鎖類,實現(xiàn)以上定義的鎖接口,同時繼承一個基類BaseDistributedLock,該基類主要用于與Zookeeper交互,包含一個嘗試獲取鎖的方法和一個釋放鎖。
/**鎖接口的具體實現(xiàn),主要借助于繼承的父類BaseDistributedLock來實現(xiàn)的接口方法
* 該父類是基于Zookeeper實現(xiàn)分布式鎖的具體細(xì)節(jié)實現(xiàn)*/
public class SimpleDistributedLockMutex extends BaseDistributedLock implements DistributedLock {
/*用于保存Zookeeper中實現(xiàn)分布式鎖的節(jié)點,如名稱為locker:/locker,
*該節(jié)點應(yīng)該是持久節(jié)點,在該節(jié)點下面創(chuàng)建臨時順序節(jié)點來實現(xiàn)分布式鎖 */
private final String basePath;
/*鎖名稱前綴,locker下創(chuàng)建的順序節(jié)點例如都以lock-開頭,這樣便于過濾無關(guān)節(jié)點
*這樣創(chuàng)建后的節(jié)點類似:lock-00000001,lock-000000002*/
private staticfinal String LOCK_NAME ="lock-";
/*用于保存某個客戶端在locker下面創(chuàng)建成功的順序節(jié)點,用于后續(xù)相關(guān)操作使用(如判斷)*/
private String ourLockPath;
/**
* 用于獲取鎖資源,通過父類的獲取鎖方法來獲取鎖
* @param time獲取鎖的超時時間
* @param unit time的時間單位
* @return是否獲取到鎖
* @throws Exception
*/
private boolean internalLock (long time, TimeUnit unit) throws Exception {
//如果ourLockPath不為空則認(rèn)為獲取到了鎖,具體實現(xiàn)細(xì)節(jié)見attemptLock的實現(xiàn)
ourLockPath = attemptLock(time, unit);
return ourLockPath !=null;
}
/**
* 傳入Zookeeper客戶端連接對象,和basePath
* @param client Zookeeper客戶端連接對象
* @param basePath basePath是一個持久節(jié)點
*/
public SimpleDistributedLockMutex(ZkClientExt client, String basePath){
/*調(diào)用父類的構(gòu)造方法在Zookeeper中創(chuàng)建basePath節(jié)點,并且為basePath節(jié)點子節(jié)點設(shè)置前綴
*同時保存basePath的引用給當(dāng)前類屬性*/
super(client,basePath,LOCK_NAME);
this.basePath = basePath;
}
/**獲取鎖,直到超時,超時后拋出異常*/
public void acquire() throws Exception {
//-1表示不設(shè)置超時時間,超時由Zookeeper決定
if (!internalLock(-1,null)){
throw new IOException("連接丟失!在路徑:'"+basePath+"'下不能獲取鎖!");
}
}
/**
* 獲取鎖,帶有超時時間
*/
public boolean acquire(long time, TimeUnit unit) throws Exception {
return internalLock(time, unit);
}
/**釋放鎖*/
public void release()throws Exception {
releaseLock(ourLockPath);
}
}
3. 分布式鎖的實現(xiàn)細(xì)節(jié)
獲取分布式鎖的重點邏輯在于BaseDistributedLock,實現(xiàn)了基于Zookeeper實現(xiàn)分布式鎖的細(xì)節(jié)。
public class BaseDistributedLock {
private final ZkClientExt client;
private final String path;
private final String basePath;
private final String lockName;
private static final Integer MAX_RETRY_COUNT = 10;
public BaseDistributedLock(ZkClientExt client, String path, String lockName){
this.client = client;
this.basePath = path;
this.path = path.concat("/").concat(lockName);
this.lockName = lockName;
}
private void deleteOurPath(String ourPath) throws Exception{
client.delete(ourPath);
}
private String createLockNode(ZkClient client, String path) throws Exception{
return client.createEphemeralSequential(path, null);
}
/**
* 獲取鎖的核心方法
* @param startMillis
* @param millisToWait
* @param ourPath
* @return
* @throws Exception
*/
private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception{
boolean haveTheLock = false;
boolean doDelete = false;
try{
while ( !haveTheLock ) {
//該方法實現(xiàn)獲取locker節(jié)點下的所有順序節(jié)點,并且從小到大排序
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length()+1);
//計算剛才客戶端創(chuàng)建的順序節(jié)點在locker的所有子節(jié)點中排序位置,如果是排序為0,則表示獲取到了鎖
int ourIndex = children.indexOf(sequenceNodeName);
/*如果在getSortedChildren中沒有找到之前創(chuàng)建的[臨時]順序節(jié)點,這表示可能由于網(wǎng)絡(luò)閃斷而導(dǎo)致
*Zookeeper認(rèn)為連接斷開而刪除了我們創(chuàng)建的節(jié)點,此時需要拋出異常,讓上一級去處理
*上一級的做法是捕獲該異常,并且執(zhí)行重試指定的次數(shù) 見后面的 attemptLock方法 */
if ( ourIndex<0 ){
throw new ZkNoNodeException("節(jié)點沒有找到: " + sequenceNodeName);
}
//如果當(dāng)前客戶端創(chuàng)建的節(jié)點在locker子節(jié)點列表中位置大于0,表示其它客戶端已經(jīng)獲取了鎖
//此時當(dāng)前客戶端需要等待其它客戶端釋放鎖,
boolean isGetTheLock = ourIndex == 0;
//如何判斷其它客戶端是否已經(jīng)釋放了鎖?從子節(jié)點列表中獲取到比自己次小的哪個節(jié)點,并對其建立監(jiān)聽
String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1);
if ( isGetTheLock ){
haveTheLock = true;
}else{
//如果次小的節(jié)點被刪除了,則表示當(dāng)前客戶端的節(jié)點應(yīng)該是最小的了,所以使用CountDownLatch來實現(xiàn)等待
String previousSequencePath = basePath .concat( "/" ) .concat( pathToWatch );
final CountDownLatch latch = new CountDownLatch(1);
final IZkDataListener previousListener = new IZkDataListener() {
//次小節(jié)點刪除事件發(fā)生時,讓countDownLatch結(jié)束等待
//此時還需要重新讓程序回到while,重新判斷一次!
public void handleDataDeleted(String dataPath) throws Exception {
latch.countDown();
}
public void handleDataChange(String dataPath, Object data) throws Exception {
// ignore
}
};
try{
//如果節(jié)點不存在會出現(xiàn)異常
client.subscribeDataChanges(previousSequencePath, previousListener);
if ( millisToWait != null ){
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 ){
doDelete = true; // timed out - delete our node
break;
}
latch.await(millisToWait, TimeUnit.MICROSECONDS);
}else{
latch.await();
}
}catch ( ZkNoNodeException e ){
//ignore
}finally{
client.unsubscribeDataChanges(previousSequencePath, previousListener);
}
}
}
}catch ( Exception e ){
//發(fā)生異常需要刪除節(jié)點
doDelete = true;
throw e;
}finally{
//如果需要刪除節(jié)點
if ( doDelete ){
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
private String getLockNodeNumber(String str, String lockName) {
int index = str.lastIndexOf(lockName);
if ( index >= 0 ){
index += lockName.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
private List<String> getSortedChildren() throws Exception {
try{
List<String> children = client.getChildren(basePath);
Collections.sort(
children,
new Comparator<String>(){
public int compare(String lhs, String rhs){
return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
}
}
);
return children;
}catch(ZkNoNodeException e){
client.createPersistent(basePath, true);
return getSortedChildren();
}
}
protected void releaseLock(String lockPath) throws Exception{
deleteOurPath(lockPath);
}
protected String attemptLock(long time, TimeUnit unit) throws Exception{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
int retryCount = 0;
//網(wǎng)絡(luò)閃斷需要重試一試
while ( !isDone ){
isDone = true;
try{
//createLockNode用于在locker(basePath持久節(jié)點)下創(chuàng)建客戶端要獲取鎖的[臨時]順序節(jié)點
ourPath = createLockNode(client, path);
/**
* 該方法用于判斷自己是否獲取到了鎖,即自己創(chuàng)建的順序節(jié)點在locker的所有子節(jié)點中是否最小
* 如果沒有獲取到鎖,則等待其它客戶端鎖的釋放,并且稍后重試直到獲取到鎖或者超時
*/
hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
}catch ( ZkNoNodeException e ){
if ( retryCount++ < MAX_RETRY_COUNT ){
isDone = false;
}else{
throw e;
}
}
}
if ( hasTheLock ){
return ourPath;
}
return null;
}
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Springboot實現(xiàn)對配置文件中的明文密碼加密詳解
我們在SpringBoot項目當(dāng)中,會把數(shù)據(jù)庫的用戶名密碼等配置直接放在yaml或者properties文件中,這樣維護(hù)數(shù)據(jù)庫的密碼等敏感信息顯然是有一定風(fēng)險的。所以本文為大家整理了對配置文件中的明文密碼加密的方法,希望對大家有所幫助2023-03-03
Java應(yīng)用多機(jī)器部署解決大量定時任務(wù)問題
這篇文章主要介紹了Java應(yīng)用多機(jī)器部署解決大量定時任務(wù)問題,兩臺服務(wù)器同時部署了同一套代碼, 代碼中寫有spring自帶的定時任務(wù),但是每次執(zhí)行定時任務(wù)時只需要一臺機(jī)器去執(zhí)行,需要的朋友可以參考下2019-07-07
MyBatis-Plus實現(xiàn)對查詢結(jié)果進(jìn)行分頁的基本步驟
MyBatis-Plus 是一個 MyBatis 的增強(qiáng)工具,在 MyBatis 的基礎(chǔ)上只做增強(qiáng)不做改變,為簡化開發(fā)、提高效率而生,MyBatis-Plus 支持多種數(shù)據(jù)庫的分頁查詢,其分頁功能是通過 Page 類實現(xiàn)的,本文介紹了使用 MyBatis-Plus 實現(xiàn)分頁查詢的基本步驟,需要的朋友可以參考下2024-08-08
Java把多個對象的list的數(shù)據(jù)合并的方法示例
在Java中合并多個List可以使用遍歷、Stream API或Apache Commons Collections,通過遍歷各個List并將元素添加到新List實現(xiàn)合并,Java 8提供了Stream API,使用Stream.of()和flatMap()可以簡潔地合并List,文中給出了多種方法示例,需要的朋友可以參考下2024-09-09
MyBatis-Plus自動填充功能失效導(dǎo)致的原因及解決
這篇文章主要介紹了MyBatis-Plus自動填充功能失效導(dǎo)致的原因及解決,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02
詳解Spring中singleton?bean如何同時服務(wù)多個請求
這篇文章主要介紹了詳解Spring中singleton?bean如何同時服務(wù)多個請求2023-02-02

