ZooKeeper入門教程三分布式鎖實(shí)現(xiàn)及完整運(yùn)行源碼
ZooKeeper入門教程二在單機(jī)和集群環(huán)境下的安裝搭建及使用
1.0版本
首先我們先介紹一個(gè)簡(jiǎn)單的zookeeper實(shí)現(xiàn)分布式鎖的思路:
用zookeeper中一個(gè)臨時(shí)節(jié)點(diǎn)代表鎖,比如在/exlusive_lock下創(chuàng)建臨時(shí)子節(jié)點(diǎn)/exlusive_lock/lock。
- 所有客戶端爭(zhēng)相創(chuàng)建此節(jié)點(diǎn),但只有一個(gè)客戶端創(chuàng)建成功。
- 創(chuàng)建成功代表獲取鎖成功,此客戶端執(zhí)行業(yè)務(wù)邏輯
- 未創(chuàng)建成功的客戶端,監(jiān)聽/exlusive_lock變更
- 獲取鎖的客戶端執(zhí)行完成后,刪除/exlusive_lock/lock,表示鎖被釋放
- 鎖被釋放后,其他監(jiān)聽/exlusive_lock變更的客戶端得到通知,再次爭(zhēng)相創(chuàng)建臨時(shí)子節(jié)點(diǎn)/exlusive_lock/lock。此時(shí)相當(dāng)于回到了第2步。
我們的程序按照上述邏輯直至搶占到鎖,執(zhí)行完業(yè)務(wù)邏輯。
上述是較為簡(jiǎn)單的分布式鎖實(shí)現(xiàn)方式。能夠應(yīng)付一般使用場(chǎng)景,但存在著如下兩個(gè)問題:
1、鎖的獲取順序和最初客戶端爭(zhēng)搶順序不一致,這不是一個(gè)公平鎖。每次鎖獲取都是當(dāng)次最先搶到鎖的客戶端。
2、羊群效應(yīng),所有沒有搶到鎖的客戶端都會(huì)監(jiān)聽/exlusive_lock變更。當(dāng)并發(fā)客戶端很多的情況下,所有的客戶端都會(huì)接到通知去爭(zhēng)搶鎖,此時(shí)就出現(xiàn)了羊群效應(yīng)。
為了解決上面的問題,我們重新設(shè)計(jì)。
2.0版本
我們?cè)?.0版本中,讓每個(gè)客戶端在/exlusive_lock下創(chuàng)建的臨時(shí)節(jié)點(diǎn)為有序節(jié)點(diǎn),這樣每個(gè)客戶端都在/exlusive_lock下有自己對(duì)應(yīng)的鎖節(jié)點(diǎn),而序號(hào)排在最前面的節(jié)點(diǎn),代表對(duì)應(yīng)的客戶端獲取鎖成功。排在后面的客戶端監(jiān)聽自己前面一個(gè)節(jié)點(diǎn),那么在他前序客戶端執(zhí)行完成后,他將得到通知,獲得鎖成功。邏輯修改如下:
- 每個(gè)客戶端往/exlusive_lock下創(chuàng)建有序臨時(shí)節(jié)點(diǎn)/exlusive_lock/lock_。創(chuàng)建成功后/exlusive_lock下面會(huì)有每個(gè)客戶端對(duì)應(yīng)的節(jié)點(diǎn),如/exlusive_lock/lock_000000001
- 客戶端取得/exlusive_lock下子節(jié)點(diǎn),并進(jìn)行排序,判斷排在最前面的是否為自己。如果自己的鎖節(jié)點(diǎn)在第一位,代表獲取鎖成功,此客戶端執(zhí)行業(yè)務(wù)邏輯
- 如果自己的鎖節(jié)點(diǎn)不在第一位,則監(jiān)聽自己前一位的鎖節(jié)點(diǎn)。例如,自己鎖節(jié)點(diǎn)lock_000000002,那么則監(jiān)聽lock_000000001.
- 當(dāng)前一位鎖節(jié)點(diǎn)(lock_000000001)對(duì)應(yīng)的客戶端執(zhí)行完成,釋放了鎖,將會(huì)觸發(fā)監(jiān)聽客戶端(lock_000000002)的邏輯。
- 監(jiān)聽客戶端重新執(zhí)行第2步邏輯,判斷自己是否獲得了鎖。
如此修改后,每個(gè)客戶端只關(guān)心自己前序鎖是否釋放,所以每次只會(huì)有一個(gè)客戶端得到通知。而且,所有客戶端的執(zhí)行順序和最初鎖創(chuàng)建的順序是一致的。解決了1.0版本的兩個(gè)問題。
接下來我們看看代碼如何實(shí)現(xiàn)。
LockSample類
此類是分布式鎖類,實(shí)現(xiàn)了2個(gè)分布式鎖的相關(guān)方法:
1、獲取鎖
2、釋放鎖
主要程序邏輯圍繞著這兩個(gè)方法的實(shí)現(xiàn),特別是獲取鎖的邏輯。我們先看一下該類的成員變量:
private ZooKeeper zkClient; private static final String LOCK_ROOT_PATH = "/Locks"; private static final String LOCK_NODE_NAME = "Lock_"; private String lockPath;
定義了zkClient,用來操作zookeeper。
鎖的根路徑,及自增節(jié)點(diǎn)的前綴。此處生產(chǎn)環(huán)境應(yīng)該由客戶端傳入。
當(dāng)前鎖的路徑。
構(gòu)造方法
public LockSample() throws IOException {
zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()== Event.KeeperState.Disconnected){
System.out.println("失去連接");
}
}
});
}創(chuàng)建zkClient,同時(shí)創(chuàng)建了狀態(tài)監(jiān)聽。此監(jiān)聽可以去掉,這里只是打印出失去連接狀態(tài)。
獲取鎖實(shí)現(xiàn)
暴露出來的獲取鎖的方法為acquireLock(),邏輯很簡(jiǎn)單:
public void acquireLock() throws InterruptedException, KeeperException {
//創(chuàng)建鎖節(jié)點(diǎn)
createLock();
//嘗試獲取鎖
attemptLock();
}首先創(chuàng)建鎖節(jié)點(diǎn),然后嘗試去取鎖。真正的邏輯都在這兩個(gè)方法中。
createLock()
先判斷鎖的根節(jié)點(diǎn)/Locks是否存在,不存在的話創(chuàng)建。然后在/Locks下創(chuàng)建有序臨時(shí)節(jié)點(diǎn),并設(shè)置當(dāng)前的鎖路徑變量lockPath。
代碼如下:
private void createLock() throws KeeperException, InterruptedException {
//如果根節(jié)點(diǎn)不存在,則創(chuàng)建根節(jié)點(diǎn)
Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 創(chuàng)建EPHEMERAL_SEQUENTIAL類型節(jié)點(diǎn)
String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + " 鎖創(chuàng)建: " + lockPath);
this.lockPath=lockPath;
}attemptLock()
這是最核心的方法,客戶端嘗試去獲取鎖,是對(duì)2.0版本邏輯的實(shí)現(xiàn),這里就不再重復(fù)邏輯,直接看代碼:
private void attemptLock() throws KeeperException, InterruptedException {
// 獲取Lock所有子節(jié)點(diǎn),按照節(jié)點(diǎn)序號(hào)排序
List<String> lockPaths = null;
lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
Collections.sort(lockPaths);
int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
// 如果lockPath是序號(hào)最小的節(jié)點(diǎn),則獲取鎖
if (index == 0) {
System.out.println(Thread.currentThread().getName() + " 鎖獲得, lockPath: " + lockPath);
return ;
} else {
// lockPath不是序號(hào)最小的節(jié)點(diǎn),監(jiān)聽前一個(gè)節(jié)點(diǎn)
String preLockPath = lockPaths.get(index - 1);
Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
// 假如前一個(gè)節(jié)點(diǎn)不存在了,比如說執(zhí)行完畢,或者執(zhí)行節(jié)點(diǎn)掉線,重新獲取鎖
if (stat == null) {
attemptLock();
} else { // 阻塞當(dāng)前進(jìn)程,直到preLockPath釋放鎖,被watcher觀察到,notifyAll后,重新acquireLock
System.out.println(" 等待前鎖釋放,prelocakPath:"+preLockPath);
synchronized (watcher) {
watcher.wait();
}
attemptLock();
}
}
}注意這一行代碼
Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
我們?cè)讷@取前一個(gè)節(jié)點(diǎn)的時(shí)候,同時(shí)設(shè)置了監(jiān)聽watcher。如果前鎖存在,則阻塞主線程。
watcher定義代碼如下:
private Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getPath() + " 前鎖釋放");
synchronized (this) {
notifyAll();
}
}
};watcher只是notifyAll,讓主線程繼續(xù)執(zhí)行,以便再次調(diào)用attemptLock(),去嘗試獲取lock。如果沒有異常情況的話,此時(shí)當(dāng)前客戶端應(yīng)該能夠成功獲取鎖。
釋放鎖實(shí)現(xiàn)
釋放鎖原語實(shí)現(xiàn)很簡(jiǎn)單,參照releaseLock()方法。代碼如下:
public void releaseLock() throws KeeperException, InterruptedException {
zkClient.delete(lockPath, -1);
zkClient.close();
System.out.println(" 鎖釋放:" + lockPath);
}關(guān)于分布式鎖的代碼到此就講解完了,我們?cè)倏聪驴蛻舳巳绾问褂盟?/p>
我們創(chuàng)建一個(gè)TicketSeller類,作為客戶端來使用分布式鎖。
TicketSeller類
sell()
不帶鎖的業(yè)務(wù)邏輯方法,代碼如下:
private void sell(){
System.out.println("售票開始");
// 線程隨機(jī)休眠數(shù)毫秒,模擬現(xiàn)實(shí)中的費(fèi)時(shí)操作
int sleepMillis = (int) (Math.random() * 2000);
try {
//代表復(fù)雜邏輯執(zhí)行了一段時(shí)間
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("售票結(jié)束");
}僅是為了演示,sleep了一段時(shí)間。
sellTicketWithLock()
此方法中,加鎖后執(zhí)行業(yè)務(wù)邏輯,代碼如下:
public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
LockSample lock = new LockSample();
lock.acquireLock();
sell();
lock.releaseLock();
}測(cè)試入口
接下來我們寫一個(gè)main函數(shù)做測(cè)試:
public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
TicketSeller ticketSeller = new TicketSeller();
for(int i=0;i<1000;i++){
ticketSeller.sellTicketWithLock();
}
}main函數(shù)中我們循環(huán)調(diào)用ticketSeller.sellTicketWithLock(),執(zhí)行加鎖后的賣票邏輯。
測(cè)試方法
1、先啟動(dòng)一個(gè)java程序運(yùn)行,可以看到日志輸出如下:
main 鎖創(chuàng)建: /Locks/Lock_0000000391 main 鎖獲得, lockPath: /Locks/Lock_0000000391 售票開始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000391 main 鎖創(chuàng)建: /Locks/Lock_0000000392 main 鎖獲得, lockPath: /Locks/Lock_0000000392 售票開始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000392 main 鎖創(chuàng)建: /Locks/Lock_0000000393 main 鎖獲得, lockPath: /Locks/Lock_0000000393 售票開始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000393
可見每次執(zhí)行都是按照鎖的順序執(zhí)行,而且由于只有一個(gè)進(jìn)程,并沒有鎖的爭(zhēng)搶發(fā)生。
2、我們?cè)賳?dòng)一個(gè)同樣的程序,鎖的爭(zhēng)搶此時(shí)發(fā)生了,可以看到雙方的日志輸出如下:
程序1:
main 鎖獲得, lockPath: /Locks/Lock_0000000471 售票開始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000471 main 鎖創(chuàng)建: /Locks/Lock_0000000473 等待前鎖釋放,prelocakPath:Lock_0000000472 /Locks/Lock_0000000472 前鎖釋放 main 鎖獲得, lockPath: /Locks/Lock_0000000473 售票開始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000473
可以看到Lock_0000000471執(zhí)行完成后,該進(jìn)程獲取的鎖為L(zhǎng)ock_0000000473,這說明Lock_0000000472被另外一個(gè)進(jìn)程創(chuàng)建了。此時(shí)Lock_0000000473在等待前鎖釋放。Lock_0000000472釋放后,Lock_0000000473才獲得鎖,然后才執(zhí)行業(yè)務(wù)邏輯。
我們?cè)倏闯绦?的日志:
main 鎖獲得, lockPath: /Locks/Lock_0000000472 售票開始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000472 main 鎖創(chuàng)建: /Locks/Lock_0000000474 等待前鎖釋放,prelocakPath:Lock_0000000473 /Locks/Lock_0000000473 前鎖釋放 main 鎖獲得, lockPath: /Locks/Lock_0000000474 售票開始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000474
可以看到,確實(shí)是進(jìn)程2獲取了Lock_0000000472。
zookeeper實(shí)現(xiàn)分布式鎖就先講到這。注意代碼只做演示用,并不適合生產(chǎn)環(huán)境使用。
代碼清單如下:
1、LockSample
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class LockSample {
//ZooKeeper配置信息
private ZooKeeper zkClient;
private static final String LOCK_ROOT_PATH = "/Locks";
private static final String LOCK_NODE_NAME = "Lock_";
private String lockPath;
// 監(jiān)控lockPath的前一個(gè)節(jié)點(diǎn)的watcher
private Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getPath() + " 前鎖釋放");
synchronized (this) {
notifyAll();
}
}
};
public LockSample() throws IOException {
zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()== Event.KeeperState.Disconnected){
System.out.println("失去連接");
}
}
});
}
//獲取鎖的原語實(shí)現(xiàn).
public void acquireLock() throws InterruptedException, KeeperException {
//創(chuàng)建鎖節(jié)點(diǎn)
createLock();
//嘗試獲取鎖
attemptLock();
}
//創(chuàng)建鎖的原語實(shí)現(xiàn)。在lock節(jié)點(diǎn)下創(chuàng)建該線程的鎖節(jié)點(diǎn)
private void createLock() throws KeeperException, InterruptedException {
//如果根節(jié)點(diǎn)不存在,則創(chuàng)建根節(jié)點(diǎn)
Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 創(chuàng)建EPHEMERAL_SEQUENTIAL類型節(jié)點(diǎn)
String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + " 鎖創(chuàng)建: " + lockPath);
this.lockPath=lockPath;
}
private void attemptLock() throws KeeperException, InterruptedException {
// 獲取Lock所有子節(jié)點(diǎn),按照節(jié)點(diǎn)序號(hào)排序
List<String> lockPaths = null;
lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
Collections.sort(lockPaths);
int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
// 如果lockPath是序號(hào)最小的節(jié)點(diǎn),則獲取鎖
if (index == 0) {
System.out.println(Thread.currentThread().getName() + " 鎖獲得, lockPath: " + lockPath);
return ;
} else {
// lockPath不是序號(hào)最小的節(jié)點(diǎn),監(jiān)控前一個(gè)節(jié)點(diǎn)
String preLockPath = lockPaths.get(index - 1);
Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
// 假如前一個(gè)節(jié)點(diǎn)不存在了,比如說執(zhí)行完畢,或者執(zhí)行節(jié)點(diǎn)掉線,重新獲取鎖
if (stat == null) {
attemptLock();
} else { // 阻塞當(dāng)前進(jìn)程,直到preLockPath釋放鎖,被watcher觀察到,notifyAll后,重新acquireLock
System.out.println(" 等待前鎖釋放,prelocakPath:"+preLockPath);
synchronized (watcher) {
watcher.wait();
}
attemptLock();
}
}
}
//釋放鎖的原語實(shí)現(xiàn)
public void releaseLock() throws KeeperException, InterruptedException {
zkClient.delete(lockPath, -1);
zkClient.close();
System.out.println(" 鎖釋放:" + lockPath);
}
}2、TicketSeller
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class TicketSeller {
private void sell(){
System.out.println("售票開始");
// 線程隨機(jī)休眠數(shù)毫秒,模擬現(xiàn)實(shí)中的費(fèi)時(shí)操作
int sleepMillis = (int) (Math.random() * 2000);
try {
//代表復(fù)雜邏輯執(zhí)行了一段時(shí)間
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("售票結(jié)束");
}
public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
LockSample lock = new LockSample();
lock.acquireLock();
sell();
lock.releaseLock();
}
public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
TicketSeller ticketSeller = new TicketSeller();
for(int i=0;i<1000;i++){
ticketSeller.sellTicketWithLock();
}
}
}以上就是ZooKeeper入門教程三分布式鎖實(shí)現(xiàn)及完整運(yùn)行源碼的詳細(xì)內(nèi)容,更多關(guān)于ZooKeeper分布式鎖實(shí)現(xiàn)源碼的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring?Boot集成RabbitMQ以及隊(duì)列模式操作
RabbitMQ是實(shí)現(xiàn)AMQP(高級(jí)消息隊(duì)列協(xié)議)的消息中間件的一種,下面這篇文章主要給大家介紹了關(guān)于Spring?Boot集成RabbitMQ以及隊(duì)列模式操作的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-04-04
Java 8 對(duì) ArrayList 元素進(jìn)行排序的操作方法
Java8提供了多種方式對(duì)ArrayList元素進(jìn)行排序,包括使用Collections.sort()方法、Collections.reverseOrder()實(shí)現(xiàn)降序排序、使用Lambda表達(dá)式進(jìn)行自定義排序、使用StreamAPI對(duì)ArrayList進(jìn)行排序及按對(duì)象屬性排序,本文通過示例代碼介紹的非常詳細(xì),感興趣的朋友一起看看吧2024-11-11
Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(36)
下面小編就為大家?guī)硪黄狫ava基礎(chǔ)的幾道練習(xí)題(分享)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧,希望可以幫到你2021-07-07
Spring MVC請(qǐng)求參數(shù)的傳遞方式
Spring MVC是一種基于Model-View-Controller(MVC)設(shè)計(jì)模式的輕量級(jí)Web框架,用于Java應(yīng)用程序的開發(fā),在處理HTTP請(qǐng)求時(shí),Spring MVC會(huì)涉及到請(qǐng)求參數(shù)的傳遞,本文給大家介紹了Spring MVC請(qǐng)求參數(shù)的傳遞方式,需要的朋友可以參考下2024-10-10
使用IDEA創(chuàng)建Java Web項(xiàng)目并部署訪問的圖文教程
本文通過圖文并茂的形式給大家介紹了使用IDEA創(chuàng)建Java Web項(xiàng)目并部署訪問的教程,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-08-08
Spring MVC保證Controller并發(fā)安全的方法小結(jié)
在 Spring MVC 中,默認(rèn)情況下,@Controller 是單例的,這意味著所有請(qǐng)求共享一個(gè) Controller 實(shí)例,為確保并發(fā)安全,Spring 并不會(huì)自動(dòng)對(duì) Controller 進(jìn)行線程安全保護(hù),本文給大家介紹了Spring MVC保證Controller并發(fā)安全的方法,需要的朋友可以參考下2024-11-11
IDEA2020 1.1中Plugins加載不出來的問題及解決方法
這篇文章主要介紹了IDEA2020 1.1中Plugins加載不出來的問題,本文還給大家提到了IDEA 2020.1.1 找不到程序包和符號(hào)的問題,感興趣的朋友跟隨小編一起看看吧2020-06-06
SpringBoot使用Redis實(shí)現(xiàn)分布式鎖
這篇文章主要為大家詳細(xì)介紹了SpringBoot使用Redis實(shí)現(xiàn)分布式鎖,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-05-05

