ZooKeeper 實現(xiàn)分布式鎖的方法示例
ZooKeeper 是一個典型的分布式數(shù)據(jù)一致性解決方案,分布式應(yīng)用程序可以基于 ZooKeeper 實現(xiàn)諸如數(shù)據(jù)發(fā)布/訂閱、負(fù)載均衡、分布式協(xié)調(diào)/通知、集群管理、Master 選舉、分布式鎖等功能。
節(jié)點
在介紹 ZooKeeper 分布式鎖前需要先了解一下 ZooKeeper 中節(jié)點(Znode),ZooKeeper 的數(shù)據(jù)存儲數(shù)據(jù)模型是一棵樹(Znode Tree),由斜杠(/)的進(jìn)行分割的路徑,就是一個 Znode(如 /locks/my_lock)。每個 Znode 上都會保存自己的數(shù)據(jù)內(nèi)容,同時還會保存一系列屬性信息。
Znode 又分為以下四種類型:
| 類型 | 描述 |
|---|---|
| 持久節(jié)點 | 節(jié)點創(chuàng)建后,會一直存在,不會因客戶端會話失效而刪除 |
| 持久順序節(jié)點 | 基本特性與持久節(jié)點一致,創(chuàng)建節(jié)點的過程中,ZooKeeper 會在其名字后自動追加一個單調(diào)增長的數(shù)字后綴,作為新的節(jié)點名 |
| 臨時節(jié)點 | 客戶端會話失效或連接關(guān)閉后,該節(jié)點會被自動刪除 |
| 臨時順序節(jié)點 | 基本特性與臨時節(jié)點一致,創(chuàng)建節(jié)點的過程中,ZooKeeper 會在其名字后自動追加一個單調(diào)增長的數(shù)字后綴,作為新的節(jié)點名 |
鎖原理
ZooKeeper 分布式鎖是基于 臨時順序節(jié)點 來實現(xiàn)的,鎖可理解為 ZooKeeper 上的一個節(jié)點,當(dāng)需要獲取鎖時,就在這個鎖節(jié)點下創(chuàng)建一個臨時順序節(jié)點。當(dāng)存在多個客戶端同時來獲取鎖,就按順序依次創(chuàng)建多個臨時順序節(jié)點,但只有排列序號是第一的那個節(jié)點能獲取鎖成功,其他節(jié)點則按順序分別監(jiān)聽前一個節(jié)點的變化,當(dāng)被監(jiān)聽者釋放鎖時,監(jiān)聽者就可以馬上獲得鎖。
而且用臨時順序節(jié)點的另外一個用意是如果某個客戶端創(chuàng)建臨時順序節(jié)點后,自己意外宕機(jī)了也沒關(guān)系,ZooKeeper 感知到某個客戶端宕機(jī)后會自動刪除對應(yīng)的臨時順序節(jié)點,相當(dāng)于自動釋放鎖。

如上圖:ClientA 和 ClientB 同時想獲取鎖,所以都在 locks 節(jié)點下創(chuàng)建了一個臨時節(jié)點 1 和 2,而 1 是當(dāng)前 locks 節(jié)點下排列序號第一的節(jié)點,所以 ClientA 獲取鎖成功,而 ClientB 處于等待狀態(tài),這時 ZooKeeper 中的 2 節(jié)點會監(jiān)聽 1 節(jié)點,當(dāng) 1節(jié)點鎖釋放(節(jié)點被刪除)時,2 就變成了 locks 節(jié)點下排列序號第一的節(jié)點,這樣 ClientB 就獲取鎖成功了。
代碼測試
請確保 ZooKeeper 服務(wù)已啟動,ZooKeeper 的搭建可參考Kafka 集群 中的 ZooKeeper 集群部分
以下是基于 C# 的測試,Java 可使用 Curator 框架,實現(xiàn)原理和上面描述是一致的,有興趣可以看看源碼,應(yīng)該也不難理解。
創(chuàng)建 .NET Core 控制臺程序 Nuget
創(chuàng)建 ZooKeeper Client
private const int CONNECTION_TIMEOUT = 50000;
private const string CONNECTION_STRING = "127.0.0.1:2181";
private ZooKeeper CreateClient()
{
var zooKeeper = new ZooKeeper(CONNECTION_STRING, CONNECTION_TIMEOUT, NullWatcher.Instance);
Stopwatch sw = new Stopwatch();
sw.Start();
while (sw.ElapsedMilliseconds < CONNECTION_TIMEOUT)
{
var state = zooKeeper.getState();
if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTING)
{
break;
}
}
sw.Stop();
return zooKeeper;
}
class NullWatcher : Watcher
{
public static readonly NullWatcher Instance = new NullWatcher();
private NullWatcher() { }
public override Task process(WatchedEvent @event)
{
return Task.CompletedTask;
}
}
添加 Lock 方法
/// <summary>
/// 加鎖
/// </summary>
/// <param name="key">加鎖的節(jié)點名</param>
/// <param name="lockAcquiredAction">加鎖成功后需要執(zhí)行的邏輯</param>
/// <param name="lockReleasedAction">鎖釋放后需要執(zhí)行的邏輯,可為空</param>
/// <returns></returns>
public async Task Lock(string key, Action lockAcquiredAction, Action lockReleasedAction = null)
{
// 獲取 ZooKeeper Client
ZooKeeper keeper = CreateClient();
// 指定鎖節(jié)點
WriteLock writeLock = new WriteLock(keeper, $"/{key}", null);
var lockCallback = new LockCallback(() =>
{
lockAcquiredAction.Invoke();
writeLock.unlock();
}, lockReleasedAction);
// 綁定鎖獲取和釋放的監(jiān)聽對象
writeLock.setLockListener(lockCallback);
// 獲取鎖(獲取失敗時會監(jiān)聽上一個臨時節(jié)點)
await writeLock.Lock();
}
class LockCallback : LockListener
{
private readonly Action _lockAcquiredAction;
private readonly Action _lockReleasedAction;
public LockCallback(Action lockAcquiredAction, Action lockReleasedAction)
{
_lockAcquiredAction = lockAcquiredAction;
_lockReleasedAction = lockReleasedAction;
}
/// <summary>
/// 獲取鎖成功回調(diào)
/// </summary>
/// <returns></returns>
public Task lockAcquired()
{
_lockAcquiredAction?.Invoke();
return Task.FromResult(0);
}
/// <summary>
/// 釋放鎖成功回調(diào)
/// </summary>
/// <returns></returns>
public Task lockReleased()
{
_lockReleasedAction?.Invoke();
return Task.FromResult(0);
}
}
多線程模擬測試
static async Task RunAsync()
{
Parallel.For(1, 10, async (i) =>
{
await new ZooKeeprDistributedLock().Lock("locks", () =>
{
Console.WriteLine($"第{i}個請求,獲取鎖成功:{DateTime.Now},線程Id:{Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000); // 業(yè)務(wù)邏輯...
}, () =>
{
Console.WriteLine($"第{i}個請求,釋放鎖成功:{DateTime.Now},線程Id:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine("-------------------------------");
});
});
await Task.CompletedTask;
}

雖然模擬的是多線程并行執(zhí)行,但最終都會依賴鎖的獲取和釋放而串行執(zhí)行實際業(yè)務(wù)邏輯。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
C#中BitmapImage與BitmapSource接口的區(qū)別對比小結(jié)
BitmapImage和BitmapSource都可以用于表示和顯示圖像,本文就來介紹一下C#中BitmapImage與BitmapSource接口的區(qū)別對比,具有一定的參考價值,感興趣的可以了解一下2024-03-03
DevExpress實現(xiàn)GridControl根據(jù)列選中一行
這篇文章主要介紹了DevExpress實現(xiàn)GridControl根據(jù)列選中一行,比較實用的功能,需要的朋友可以參考下2014-08-08
C#中Hashtable和Dictionary的區(qū)別與用法示例
由于 Hashtable 和 Dictionary 同時存在, 在使用場景上必然存在選擇性, 并不任何時刻都能相互替代。所以這篇文章主要給大家介紹了關(guān)于C#中Hashtable和Dictionary區(qū)別的相關(guān)資料,需要的朋友可以參考下2021-05-05
jQuery結(jié)合C#實現(xiàn)上傳文件的方法
這篇文章主要介紹了jQuery結(jié)合C#實現(xiàn)上傳文件的方法,涉及C#文件上傳的相關(guān)技巧,需要的朋友可以參考下2015-04-04

