zookeeper實(shí)戰(zhàn)之實(shí)現(xiàn)分布式鎖的方法
一、分布式鎖的通用實(shí)現(xiàn)思路
分布式鎖的概念以及常規(guī)解決方案可以參考之前的博客:聊聊分布式鎖的解決方案;今天我們先分析下分布式鎖的實(shí)現(xiàn)思路;
- 首先,需要保證唯一性,即某一時(shí)點(diǎn)只能有一個(gè)線程訪問某一資源;比方說待辦短信通知功能,每天早上九點(diǎn)短信提醒所有工單的處理人處理工單,假設(shè)服務(wù)部署了20個(gè)容器,那么早上九點(diǎn)的時(shí)候會(huì)有20個(gè)線程啟動(dòng)準(zhǔn)備發(fā)送短信,此時(shí)我們只能讓一個(gè)線程執(zhí)行短信發(fā)送,否則用戶會(huì)收到20條相同的短信;
- 其次,需要考慮下何時(shí)應(yīng)該釋放鎖?這又分三種情況,一是拿到鎖的線程正常結(jié)束,另一種是獲取鎖的線程異常退出,還有種是獲取鎖的線程一直阻塞;第一種情況直接釋放即可,第二種情況可以通過定義下鎖的過期時(shí)間然后通過定時(shí)任務(wù)去釋放鎖;zk的話直接通過臨時(shí)節(jié)點(diǎn)即可;最后一種阻塞的情況也可以通過定時(shí)任務(wù)來釋放,但是需要根據(jù)業(yè)務(wù)來綜合判斷,如果業(yè)務(wù)本身就是長(zhǎng)時(shí)間耗時(shí)的操作那么鎖的過期時(shí)間就得設(shè)置的久一點(diǎn)
- 最后,當(dāng)拿到鎖的線程釋放鎖的時(shí)候,如何通知其他線程可以搶鎖了呢
這里簡(jiǎn)單介紹兩種解決方案,一種是所有需要鎖的線程主動(dòng)輪詢,固定時(shí)間去訪問下看鎖是否釋放,但是這種方案無端增加服務(wù)器壓力并且時(shí)效性無法保證;另一種就是zk的watch,監(jiān)聽鎖所在的目錄,一有變化立馬得到通知
二、ZK實(shí)現(xiàn)分布式鎖的思路
zk通過每個(gè)線程在同一父目錄下創(chuàng)建臨時(shí)有序節(jié)點(diǎn),然后通過比較節(jié)點(diǎn)的id大小來實(shí)現(xiàn)分布式鎖功能;再通過zk的watch機(jī)制實(shí)時(shí)獲取節(jié)點(diǎn)的狀態(tài),如果被刪除立即重新爭(zhēng)搶鎖;具體流程見線圖:

提示:需要關(guān)注下圖里判斷自身不是最小節(jié)點(diǎn)時(shí)的監(jiān)聽情況,為什么不監(jiān)聽父節(jié)點(diǎn)?原因圖里已有描述,這里就不再贅述
三、ZK實(shí)現(xiàn)分布式鎖的編碼實(shí)現(xiàn)
1、核心工具類實(shí)現(xiàn)
通過不斷的調(diào)試,我封裝了一個(gè)ZkLockHelper類,里面封裝了上鎖和釋放鎖的方法,為了方便我將zk的一些監(jiān)聽和回調(diào)機(jī)智也融合到一起了,并沒有抽出來,下面貼上該類的全部代碼
package com.darling.service.zookeeper.lock;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.platform.commons.util.StringUtils;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
/**
* @description:
* @author: dll
* @date: Created in 2022/11/4 8:41
* @version:
* @modified By:
*/
@Data
@Slf4j
public class ZkLockHelper implements AsyncCallback.StringCallback, AsyncCallback.StatCallback,Watcher, AsyncCallback.ChildrenCallback {
private final String lockPath = "/lockItem";
ZooKeeper zkClient;
String threadName;
CountDownLatch cd = new CountDownLatch(1);
private String pathName;
/**
* 上鎖
*/
public void tryLock() {
try {
log.info("線程:{}正在創(chuàng)建節(jié)點(diǎn)",threadName);
zkClient.create(lockPath,(threadName).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"AAA");
log.info("線程:{}正在阻塞......",threadName);
// 由于上面是異步創(chuàng)建所以這里需要阻塞住當(dāng)前線程
cd.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 釋放鎖
*/
public void unLock() {
try {
zkClient.delete(pathName,-1);
System.out.println(threadName + " 工作結(jié)束....");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* create方法的回調(diào),創(chuàng)建成功后在此處獲取/DCSLock的子目錄,比較節(jié)點(diǎn)ID是否最小,是則拿到鎖。。。
* @param rc 狀態(tài)碼
* @param path create方法的path入?yún)?
* @param ctx create方法的上下文入?yún)?
* @param name 創(chuàng)建成功的臨時(shí)有序節(jié)點(diǎn)的名稱,即在path的后面加上了zk維護(hù)的自增ID;
* 注意如果創(chuàng)建的不是有序節(jié)點(diǎn),那么此處的name和path的內(nèi)容一致
*/
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.info(">>>>>>>>>>>>>>>>>processResult,rx:{},path:{},ctx:{},name:{}",rc,path,ctx.toString(),name);
if (StringUtils.isNotBlank(name)) {
try {
pathName = name ;
// 此處path需注意要寫/
zkClient.getChildren("/", false,this,"123");
// List<String> children = zkClient.getChildren("/", false);
// log.info(">>>>>threadName:{},children:{}",threadName,children);
// // 給children排序
// Collections.sort(children);
// int i = children.indexOf(pathName.substring(1));
// // 判斷自身是否第一個(gè)
// if (Objects.equals(i,0)) {
// // 是第一個(gè)則表示搶到了鎖
// log.info("線程{}搶到了鎖",threadName);
// cd.countDown();
// }else {
// // 表示沒搶到鎖
// log.info("線程{}搶鎖失敗,重新注冊(cè)監(jiān)聽器",threadName);
// zkClient.exists("/"+children.get(i-1),this,this,"AAA");
// }
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* exists方法的回調(diào),此處暫不做處理
* @param rc
* @param path
* @param ctx
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
}
/**
* exists的watch監(jiān)聽
* @param event
*/
@Override
public void process(WatchedEvent event) {
//如果第一個(gè)線程鎖釋放了,等價(jià)于第一個(gè)線程刪除了節(jié)點(diǎn),此時(shí)只有第二個(gè)線程會(huì)監(jiān)控的到
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
zkClient.getChildren("/", false,this,"123");
// // 此處path需注意要寫"/"
// List<String> children = null;
// try {
// children = zkClient.getChildren("/", false);
// } catch (KeeperException e) {
// e.printStackTrace();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// log.info(">>>>>threadName:{},children:{}",threadName,children);
// // 給children排序
// Collections.sort(children);
// int i = children.indexOf(pathName.substring(1));
// // 判斷自身是否第一個(gè)
// if (Objects.equals(i,0)) {
// // 是第一個(gè)則表示搶到了鎖
// log.info("線程{}搶到了鎖",threadName);
// cd.countDown();
// }else {
// /**
// * 表示沒搶到鎖;需要判斷前置節(jié)點(diǎn)存不存在,其實(shí)這里并不是特別關(guān)心前置節(jié)點(diǎn)存不存在,所以其回調(diào)可以不處理;
// * 但是這里關(guān)注的前置節(jié)點(diǎn)的監(jiān)聽,當(dāng)前置節(jié)點(diǎn)監(jiān)聽到被刪除時(shí)就是其他線程搶鎖之時(shí)
// */
// zkClient.exists("/"+children.get(i-1),this,this,"AAA");
// }
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
}
}
/**
* getChildren方法的回調(diào)
* @param rc
* @param path
* @param ctx
* @param children
*/
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
try {
log.info(">>>>>threadName:{},children:{}", threadName, children);
if (Objects.isNull(children)) {
return;
}
// 給children排序
Collections.sort(children);
int i = children.indexOf(pathName.substring(1));
// 判斷自身是否第一個(gè)
if (Objects.equals(i, 0)) {
// 是第一個(gè)則表示搶到了鎖
log.info("線程{}搶到了鎖", threadName);
cd.countDown();
} else {
// 表示沒搶到鎖
log.info("線程{}搶鎖失敗,重新注冊(cè)監(jiān)聽器", threadName);
/**
* 表示沒搶到鎖;需要判斷前置節(jié)點(diǎn)存不存在,其實(shí)這里并不是特別關(guān)心前置節(jié)點(diǎn)存不存在,所以其回調(diào)可以不處理;
* 但是這里關(guān)注的前置節(jié)點(diǎn)的監(jiān)聽,當(dāng)前置節(jié)點(diǎn)監(jiān)聽到被刪除時(shí)就是其他線程搶鎖之時(shí)
*/
zkClient.exists("/" + children.get(i - 1), this, this, "AAA");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
提示:代碼中注釋的代碼塊可以關(guān)注下,原本是直接阻塞式編程,將獲取所有子節(jié)點(diǎn)并釋放鎖的操作直接寫在getChildren方法的回調(diào)里,后來發(fā)現(xiàn)當(dāng)節(jié)點(diǎn)被刪除時(shí)我們還要重新?lián)屾i,那么代碼就冗余了,于是結(jié)合響應(yīng)式編程的思想,將這段核心代碼放到
getChildren方法的回調(diào)里,這樣代碼簡(jiǎn)潔了并且可以讓業(yè)務(wù)更只關(guān)注于getChildren這件事了
2、測(cè)試代碼編寫
線程安全問題復(fù)現(xiàn)
package com.darling.service.zookeeper.lock;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
/**
* @description: 開啟是個(gè)線程給i做遞減操作,未加鎖的情況下會(huì)有線程安全問題
* @author: dll
* @date: Created in 2022/11/8 8:32
* @version:
* @modified By:
*/
@Slf4j
public class ZkLockTest02 {
private int i = 10;
@Test
public void test() throws InterruptedException {
for (int n = 0; n < 10; n++) {
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(100);
incre();
}
}).start();
}
Thread.sleep(5000);
log.info("i = {}",i);
}
/**
* i遞減 線程不安全
*/
public void incre(){
// i.incrementAndGet();
log.info("當(dāng)前線程:{},i = {}",Thread.currentThread().getName(),i--);
}
}
上面代碼運(yùn)行結(jié)果如下:

使用上面封裝的ZkLockHelper實(shí)現(xiàn)的分布式鎖
package com.darling.service.zookeeper.lock;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @description: 使用zk實(shí)現(xiàn)的分布式鎖解決線程安全問題
* @author: dll
* @date: Created in 2022/11/8 8:32
* @version:
* @modified By:
*/
@Slf4j
public class ZkLockTest03 {
ZooKeeper zkClient;
@Before
public void conn (){
zkClient = ZkUtil.getZkClient();
}
@After
public void close (){
try {
zkClient.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private int i = 10;
@Test
public void test() throws InterruptedException {
for (int n = 0; n < 10; n++) {
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(100);
ZkLockHelper zkHelper = new ZkLockHelper();
// 這里給zkHelper設(shè)置threadName是為了后續(xù)調(diào)試的時(shí)候日志打印,便于觀察存在的問題
String threadName = Thread.currentThread().getName();
zkHelper.setThreadName(threadName);
zkHelper.setZkClient(zkClient);
// tryLock上鎖
zkHelper.tryLock();
incre();
log.info("線程{}正在執(zhí)行業(yè)務(wù)代碼...",threadName);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 釋放鎖
zkHelper.unLock();
}
}).start();
}
while (true) {
}
}
/**
* i遞減 線程不安全
*/
public void incre(){
// i.incrementAndGet();
log.info("☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆當(dāng)前線程:{},i = {}",Thread.currentThread().getName(),i--);
}
}
運(yùn)行結(jié)果如下:

由于日志中摻雜著zk的日志所有此處并未截全,但是也能看到i是在按規(guī)律遞減的,不會(huì)出現(xiàn)通過線程拿到相同值的情況
四、zk實(shí)現(xiàn)分布式鎖的優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 集群部署不存在單點(diǎn)故障問題
- 統(tǒng)一視圖
zk集群每個(gè)節(jié)點(diǎn)對(duì)外提供的數(shù)據(jù)是一致的,數(shù)據(jù)一致性有所報(bào)障 - 臨時(shí)有序節(jié)點(diǎn)
zk提供臨時(shí)有序節(jié)點(diǎn),這樣當(dāng)客戶端失去連接時(shí)會(huì)自動(dòng)釋放鎖,不用像其他方案一樣當(dāng)拿到鎖的實(shí)例服務(wù)不可用時(shí),需要定時(shí)任務(wù)去刪除鎖;臨時(shí)節(jié)點(diǎn)的特性就是當(dāng)客戶端失去連接會(huì)自動(dòng)刪除 - watch能力加持
當(dāng)獲取不到鎖時(shí),無需客戶端定期輪詢爭(zhēng)搶,只需watch前一節(jié)點(diǎn)即可,當(dāng)有變化時(shí)會(huì)及時(shí)通知,比普通方案即及時(shí)又高效;注意這里最好只watch前一節(jié)點(diǎn),如果watch整個(gè)父目錄的話,當(dāng)客戶端并發(fā)較大時(shí)會(huì)不斷有請(qǐng)求進(jìn)出zk,給zk性能帶來壓力
缺點(diǎn)
- 與單機(jī)版redis比較的話性能肯定較差,但是當(dāng)客戶端集群足夠龐大且業(yè)務(wù)量足夠多時(shí)肯定還是集群更加穩(wěn)定
好了,zk實(shí)現(xiàn)分布式鎖的編碼實(shí)現(xiàn)就到這了,后續(xù)有時(shí)間再寫偏redis的,其實(shí)思路縷清了,編碼實(shí)現(xiàn)還是很簡(jiǎn)單的
到此這篇關(guān)于zookeeper實(shí)戰(zhàn)之實(shí)現(xiàn)分布式鎖的方法的文章就介紹到這了,更多相關(guān)zookeeper分布式鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Nacos-SpringBoot框架啟動(dòng)不加載bootstrap.yml的解決
這篇文章主要介紹了Nacos-SpringBoot框架啟動(dòng)不加載bootstrap.yml的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-11-11
Java計(jì)時(shí)新姿勢(shì)StopWatch的使用方法詳解
這篇文章主要給大家介紹了關(guān)于Java計(jì)時(shí)新姿勢(shì)StopWatch的相關(guān)資料,以及java 中使用StopWatch來計(jì)算時(shí)間差的方法,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2022-01-01
詳談StringUtils3之StringUtils.isEmpty()和StringUtils.isB的區(qū)別
這篇文章主要介紹了詳談StringUtils3之StringUtils.isEmpty()和StringUtils.isB的區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07
Java基于正則表達(dá)式獲取指定HTML標(biāo)簽指定屬性值的方法
這篇文章主要介紹了Java基于正則表達(dá)式獲取指定HTML標(biāo)簽指定屬性值的方法,涉及java基于正則的HTML元素匹配相關(guān)操作技巧,需要的朋友可以參考下2017-01-01
SpringSecurity導(dǎo)致SpringBoot跨域失效的問題解決
本文主要介紹了SpringSecurity導(dǎo)致SpringBoot跨域失效的問題解決,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01

