ZooKeeper Java API編程實例分析
本實例我們用的是java3.4.6版本,實例方便大家學(xué)習(xí)完后有不明白的可以在留言區(qū)討論。
開發(fā)應(yīng)用程序的ZooKeeper Java綁定主要由兩個Java包組成:
org.apache.zookeeper
org.apache.zookeeper.data
org.apache.zookeeper包由ZooKeeper監(jiān)視的接口定義和ZooKeeper的各種回調(diào)處理程序組成。 它定義了ZooKeeper客戶端類庫的主要類以及許多ZooKeeper事件類型和狀態(tài)的靜態(tài)定義。 org.apache.zookeeper.data包定義了與數(shù)據(jù)寄存器(也稱為znode)相關(guān)的特性,例如訪問控制列表(ACL),IDs,stats等。
ZooKeeper Java API中的org.apache.zookeeper.server,org.apache.zookeeper.server.quorum和org.apache.zookeeper.server.upgrade包是服務(wù)器實現(xiàn)的一部分。 org.apache.zookeeper.client包用于查詢ZooKeeper服務(wù)器的狀態(tài)。
一 準(zhǔn)備開發(fā)環(huán)境
Apache ZooKeeper是一個復(fù)雜的軟件,因此它需要運行許多其他類庫。 依賴庫作為jar文件在ZooKeeper發(fā)行版中附帶在lib目錄中。 核心ZooKeeper jar文件名字為zookeeper-3.4.6.jar,位于主目錄下。
要開發(fā)Java的ZooKeeper應(yīng)用程序,我們必須設(shè)置指向ZooKeeper jar的類路徑,以及ZooKeeper所依賴的所有第三方庫。在 bin 目錄下有一個 zkEnv.sh文件,可以用來設(shè)置CLASSPATH。
我們需要將腳本如下設(shè)置,在命令行中執(zhí)行以下語句:
$ ZOOBINDIR=${ZK_HOME}/bin
$ source ${ZOOBINDIR}/zkEnv.sh
shell變量ZK_HOME被設(shè)置為安裝ZooKeeper的路徑,在我的設(shè)置中,它是/usr/share/zookeeper。 之后,CLASSPATH變量被正確設(shè)置,在我的系統(tǒng)中,如下所示:
$ echo $CLASSPATH /usr/share/zookeeper-3.4.6/bin/../build/classes :/usr/share/zookeeper-3.4.6/bin/../build/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar :/usr/share/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar :/usr/share/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar :/usr/share/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar :/usr/share/zookeeper-3.4.6/bin/../src/java/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../conf:
在Windows操作系統(tǒng)中,需要運行zkEnv.cmd腳本。 現(xiàn)在可以使用CLASSPATH變量來編譯和運行使用ZooKeeper API編寫的Java程序。 可以在Uni/Linux中的主目錄的.bashrc文件中找到zkEnv.sh腳本,避免每次啟動shell會話時都采用它。
二 第一個ZooKeeper程序
為了引入ZooKeeper Java API,讓我們從一個非常簡單的程序開始,它可以連接到localhost中的ZooKeeper實例,如果連接成功,它將在ZooKeeper名稱空間的根路徑下打印znode的列表。
這個程序的代碼如下所示:
/*Our First ZooKeeper Program*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
public class HelloZooKeeper {
public static void main(String[] args) throws IOException {
String hostPort = "localhost:2181";
String zpath = "/";
List <String> zooChildren = new ArrayList<String>();
ZooKeeper zk = new ZooKeeper(hostPort, 2000, null);
if (zk != null) {
try {
zooChildren = zk.getChildren(zpath, false);
System.out.println("Znodes of '/': ");
for (String child: zooChildren) {
//print the children
System.out.println(child);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在構(gòu)建和執(zhí)行前面的代碼片段之前,讓我們來看看它具體做了什么。代碼從導(dǎo)入語句開始。使用這些語句,我們導(dǎo)入了程序各個組件所需的包。如前所述,org.apache.zookeeper包包含客戶端與ZooKeeper服務(wù)器進行交互所需的所有類和接口。在導(dǎo)入包之后,定義了一個名為HelloZooKeeper的類。由于我們要連接到在同一系統(tǒng)中運行的ZooKeeper實例,在main方法中將主機和端口字符串定義為localhost:2181。代碼行zk = new ZooKeeper(hostPort, 2000, null)調(diào)用ZooKeeper構(gòu)造方法,該構(gòu)造方法嘗試連接到ZooKeeper服務(wù)器并返回一個引用。對于連接到ZooKeeper服務(wù)器實例并維護該連接的客戶端程序,需要維護一個實時會話。在此例中,構(gòu)造方法實例化的zk對象返回的引用表示這個會話。 ZooKeeper API是圍繞這個引用構(gòu)建的,每個方法調(diào)用都需要一個引用來執(zhí)行。
ZooKeeper類的構(gòu)造方法使用以下代碼創(chuàng)建ZooKeeper實例的引用:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
使用的參數(shù)含義如下:
connectString:以逗號分隔的主機:端口號列表,每個對應(yīng)一個ZooKeeper服務(wù)器。 例如,10.0.0.1:2001,10.0.0.2:2002和10.0.0.3:2003表示三個節(jié)點的ZooKeeper ensemble的有效的主機:端口匹配對。 sessionTimeout:這是以毫秒為單位的會話超時時間。這是ZooKeeper在宣布session結(jié)束之前,沒有從客戶端獲得心跳的時間。 watcher:一個watcher對象,如果創(chuàng)建,當(dāng)狀態(tài)改變和發(fā)生節(jié)點事件時會收到通知。這個watcher對象需要通過一個用戶定義的類單獨創(chuàng)建,通過實現(xiàn)Watcher接口并將實例化的對象傳遞給ZooKeeper構(gòu)造方法??蛻舳藨?yīng)用程序可以收到各種類型的事件的通知,例如連接丟失、會話過期等。
ZooKeeper Java API定義了另外帶有三個參數(shù)的構(gòu)造方法,以指定更高級的操作。代碼如下:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
在ZooKeeper類的上面的構(gòu)造方法中,如果設(shè)置為true,boolean canBeReadOnly參數(shù)允許創(chuàng)建的客戶端在網(wǎng)絡(luò)分區(qū)的情況下進入只讀模式。只讀模式是客戶端無法找到任何多數(shù)服務(wù)器的場景,但有一個可以到達的分區(qū)服務(wù)器,以只讀模式連接到它,這樣就允許對服務(wù)器的讀取請求,而寫入請求則不允許??蛻舳死^續(xù)嘗試在后臺連接到大多數(shù)服務(wù)器,同時仍然保持只讀模式。分區(qū)服務(wù)器僅僅是ZooKeeper組的一個子集,它是由于集群中的網(wǎng)絡(luò)分配而形成的。大多數(shù)服務(wù)器構(gòu)成了ensemble中的大多數(shù)quorum。
以下構(gòu)造方法顯示了兩個附加參數(shù)的定義:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
這個構(gòu)造方法允許ZooKeeper客戶端對象創(chuàng)建兩個額外的參數(shù):
sessionId:在客戶端重新連接到ZooKeeper服務(wù)器的情況下,可以使用特定的會話ID來引用先前連接的會話 sessionPasswd:如果指定的會話需要密碼,可以在這里指定
以下構(gòu)造方法是前兩個調(diào)用的組合:
ZooKeeper(String connectString, int sessionTimeout,Watcher watcher, long sessionId, byte[] sessionPasswd,boolean canBeReadOnly)
此構(gòu)造方法是前兩個調(diào)用的組合,允許在啟用只讀模式的情況下重新連接到指定的會話。
Note
ZooKeeper類的詳細Java API文檔可以在http://zookeeper.apache.org/doc/r3.4.6/api/index.html上查詢。
現(xiàn)在,回到我們的ZooKeeper程序。 在調(diào)用構(gòu)造方法后,如果連接成功,我們將得到ZooKeeper服務(wù)器的引用。 我們通過下面的代碼將引用傳遞給getChildren方法:
zooChildren = zk.getChildren(zpath, false)
ZooKeeper類的getChildren(String path,boolean watch)方法返回給定路徑上znode的子級列表。 我們只是迭代這個方法返回的列表,并將字符串打印到控制臺。
將程序命名為HelloZooKeeper.java,并編譯我們的程序如下:
$ javac -cp $CLASSPATH HelloZooKeeper.java
在我們運行的程序之前,需要使用以下命令來啟動ZooKeeper服務(wù)器實例:
$ ${ZK_HOME}/bin/zkServer.sh start
運行程序如下:
$ java -cp $CLASSPATH HelloZooKeeper
執(zhí)行程序會在控制臺上打印日志消息,顯示所使用的ZooKeeper版本,Java版本,Java類路徑,服務(wù)器體系結(jié)構(gòu)等等。 這里顯示了這些日志消息的一部分:

ZooKeeper Java API生成的日志消息對調(diào)試非常有用。 它為我們提供了關(guān)于客戶端連接到ZooKeeper服務(wù)器,建立會話等后臺得信息。 上面顯示的最后三條日志消息告訴我們客戶端如何使用程序中指定的參數(shù)來啟動連接,以及在成功連接后,服務(wù)器如何為客戶端分配會話ID。
最后,程序執(zhí)行最后在控制臺中輸出以下內(nèi)容:

我們可以使用ZooKeeper shell來驗證程序的正確性:
$ $ZK_HOME/bin/zkCli.sh -server localhost

恭喜! 我們剛剛成功編寫了我們的第一個ZooKeeper客戶端程序。
二 實現(xiàn)Watcher接口
ZooKeeper Watcher監(jiān)視使客戶端能夠接收來自ZooKeeper服務(wù)器的通知,并在發(fā)生時處理這些事件。 ZooKeeper Java API提供了一個名為Watcher的公共接口,客戶端事件處理程序類必須實現(xiàn)該接口才能接收有關(guān)來自ZooKeeper服務(wù)器的事件通知。 以編程方式,使用這種客戶端的應(yīng)用程序通過向客戶端注冊回調(diào)(callback)對象來處理這些事件。
我們將實現(xiàn)Watcher接口,處理與znode關(guān)聯(lián)的數(shù)據(jù)更改時由ZooKeeper生成的事件。
Watcher接口在org.apache.zookeeper包中聲明如下:
public interface Watcher {
void process(WatchedEvent event);
}
為了演示znode數(shù)據(jù)監(jiān)視器(Watcher),有兩個Java類:DataWatcher和DataUpdater。 DataWatcher將一直運行,并在/MyConfig指定znode路徑中偵聽來自ZooKeeper服務(wù)器的NodeDataChange事件。DataUpdater類將定期更新此znode路徑中的數(shù)據(jù)字段,這將生成事件,并且在接收到這些事件后,DataWatcher類將把更改后的數(shù)據(jù)打印到控制臺上。
以下是DataWatcher.java類的代碼:
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
public class DataWatcher implements Watcher, Runnable {
private static String hostPort = "localhost:2181";
private static String zooDataPath = "/MyConfig";
byte zoo_data[] = null;
ZooKeeper zk;
public DataWatcher() {
try {
zk = new ZooKeeper(hostPort, 2000, this);
if (zk != null) {
try {
//Create the znode if it doesn't exist, with the following code:
if (zk.exists(zooDataPath, this) == null) {
zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void printData() throws InterruptedException, KeeperException {
zoo_data = zk.getData(zooDataPath, this, null);
String zString = new String(zoo_data);
// The following code prints the current content of the znode to the console:
System.out.printf("\nCurrent Data @ ZK Path %s: %s", zooDataPath, zString);
}
@Override
public void process(WatchedEvent event) {
System.out.printf(
"\nEvent Received: %s", event.toString());
//We will process only events of type NodeDataChanged
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
printData();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args)
throws InterruptedException, KeeperException {
DataWatcher dataWatcher = new DataWatcher();
dataWatcher.printData();
dataWatcher.run();
}
public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
我們來看一下DataWatcher.java類的代碼來理解一個ZooKeeper監(jiān)視器的實現(xiàn)。 DataWatcher公共類實現(xiàn)Watcher接口以及Runnable接口,打算將監(jiān)視器作為線程運行。 main方法創(chuàng)建DataWatcher類的一個實例。 在前面的代碼中,DataWatcher構(gòu)造方法嘗試連接到在本地主機上運行的ZooKeeper實例。 如果連接成功,我們用下面的代碼檢查znode路徑/MyConfig是否存在:
if (zk.exists(zooDataPath, this) == null) {
如果znode不存在ZooKeeper命名空間中,那么exists方法調(diào)用將返回null,并且嘗試使用代碼將其創(chuàng)建為持久化znode,如下所示:
zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
接下來是process方法,它在org.apache.ZooKeeper的Watcher接口中聲明,并由DataWatcher類使用以下代碼實現(xiàn):
public void process(WatchedEvent event) {
為了簡單起見,在process方法中,打印從ZooKeeper實例接收的事件,并僅對NodeDataChanged類型的事件進行進一步處理,如下所示:
if (event.getType() == Event.EventType.NodeDataChanged)
當(dāng)znode路徑/MyConfig的數(shù)據(jù)字段發(fā)生任何更新或更改而收到NodeDataChanged類型的事件時,調(diào)用printData方法來打印znode的當(dāng)前內(nèi)容。 在znode上執(zhí)行一個getData調(diào)用時,我們再次設(shè)置一個監(jiān)視,這是該方法的第二個參數(shù),如下面的代碼所示:
zoo_data = zk.getData(zooDataPath, this, null);
監(jiān)視事件是發(fā)送給設(shè)置監(jiān)視的客戶端的一次性觸發(fā)器,為了不斷接收進一步的事件通知,客戶端應(yīng)該重置監(jiān)視器。
DataUpdater.java是一個簡單的類,它連接到運行本地主機的ZooKeeper實例,并用隨機字符串更新znode路徑/MyConfig的數(shù)據(jù)字段。 在這里,我們選擇使用通用唯一標(biāo)識符(UUID)字符串更新znode,因為后續(xù)的UUID生成器調(diào)用將保證生成唯一的字符串。
DataUpdater.java類代碼如下:
import java.io.IOException;
import java.util.UUID;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class DataUpdater implements Watcher {
private static String hostPort = "localhost:2181";
private static String zooDataPath = "/MyConfig";
ZooKeeper zk;
public DataUpdater() throws IOException {
try {
zk = new ZooKeeper(hostPort, 2000, this);
} catch (IOException e) {
e.printStackTrace();
}
}
// updates the znode path /MyConfig every 5 seconds with a new UUID string.
public void run() throws InterruptedException, KeeperException {
while (true) {
String uuid = UUID.randomUUID().toString();
byte zoo_data[] = uuid.getBytes();
zk.setData(zooDataPath, zoo_data, -1);
try {
Thread.sleep(5000); // Sleep for 5 secs
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws
IOException, InterruptedException, KeeperException {
DataUpdater dataUpdater = new DataUpdater();
dataUpdater.run();
}
@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
}
}
上面的代碼使ZooKeeper服務(wù)器觸發(fā)一個NodeDataChanged事件。 由于DataWatcher為此znode路徑設(shè)置了監(jiān)視,因此它會接收數(shù)據(jù)更改事件的通知。 然后它檢索更新的數(shù)據(jù),重置監(jiān)視,并在控制臺上打印數(shù)據(jù)。
使用以下命令編譯DataWatcher和DataUpdater類:
$ javac –cp $CLASSPATH DataWatcher.java $ javac –cp $CLASSPATH DataUpdater.java
要執(zhí)行監(jiān)視器和更新程序,需要打開兩個終端窗口。 我要先運行監(jiān)視器,因為它創(chuàng)建了/MyConfig的znode(如果還未在ZooKeeper的命名空間中創(chuàng)建的話)。 運行監(jiān)視器之前,請確保ZooKeeper服務(wù)器在本地主機上已經(jīng)運行。
在其中一個終端窗口中,通過運行以下命令來執(zhí)行watcher類:
$ java –cp $CLASSPATH DataWatcher
輸出類似于以下屏幕截圖所示的消息:

如前面的截圖所示,znode路徑/MyConfig是由DataWatcher類創(chuàng)建的。 它也打印znode的內(nèi)容,但沒有打印在控制臺中,因為我們在創(chuàng)建znode時沒有設(shè)置任何數(shù)據(jù)。 當(dāng)znode被創(chuàng)建時,類中的監(jiān)視者收到了NodeCreated類型的事件通知,這個通知被打印在控制臺中。 DataWatcher類繼續(xù)運行,并從ZooKeeper服務(wù)器偵聽/MyConfig節(jié)點上的事件。
讓我們在另一個終端窗口中運行DataUpdater類:
$ java -cp $CLASSPATH DataUpdater
將最初的ZooKeeper特定日志消息記錄到控制臺后,DataUpdater類運行時沒有提示。 它將一個新的UUID字符串設(shè)置到ZooKeeper路徑/MyConfig的數(shù)據(jù)字段中。 因此,看到每隔5秒鐘,在下面的屏幕截圖中顯示的輸出內(nèi)容打印在運行DataWatche的終端窗口中:

DataWatcher也可以使用ZooKeeper shell進行測試。 繼續(xù)像以前一樣在終端中運行DataWatcher類,并在另一個終端中調(diào)用ZooKeeper shell并運行以下屏幕截圖中所示的命令:

在DataWatcher正在運行的終端中,將打印以下消息:

三 示例——群集監(jiān)視器
通過互聯(lián)網(wǎng)提供的流行服務(wù),如電子郵件,文件服務(wù)平臺,在線游戲等,都是通過跨越多個數(shù)據(jù)中心的高度可用的成百上千臺服務(wù)器來服務(wù)的,而這些服務(wù)器通常在地理位置上分開。 在這種集群中,設(shè)置了一些專用的服務(wù)器節(jié)點來監(jiān)視生產(chǎn)網(wǎng)絡(luò)中承載服務(wù)或應(yīng)用程序的服務(wù)器的活躍性。 在云計算環(huán)境中,也用于管理云環(huán)境的這種監(jiān)控節(jié)點被稱為云控制器。 這些控制器節(jié)點的一個重要工作是實時檢測生產(chǎn)服務(wù)器的故障,并相應(yīng)地通知管理員,并采取必要的措施,例如將故障服務(wù)器上的應(yīng)用程序故障轉(zhuǎn)移到另一個服務(wù)器,從而確保容錯性和高可用性。
在本節(jié)中,我們將使用ZooKeeper Java客戶端API開發(fā)一個簡約的分布式集群監(jiān)視器模型。 使用ZooKeeper的ephemeral znode概念來構(gòu)建這個監(jiān)視模型相當(dāng)簡單和優(yōu)雅,如以下步驟所述:
每個生產(chǎn)服務(wù)器運行一個ZooKeeper客戶端作為守護進程。 這個過程連接到ZooKeeper服務(wù)器,并在/ZooKeeper命名空間的預(yù)定義路徑(比如/Members)下創(chuàng)建一個帶有名稱(最好是其網(wǎng)絡(luò)名稱或主機名)的ephemeral znode。云控制器節(jié)點運行ZooKeeper監(jiān)視器進程,該進程監(jiān)視路徑/Members并監(jiān)聽NodeChildrenChanged類型的事件。 這個監(jiān)視器進程作為服務(wù)或守護進程運行,并設(shè)置或重置路徑上的監(jiān)視,并且實現(xiàn)其邏輯以調(diào)用適當(dāng)?shù)哪K來為監(jiān)視事件采取必要的行動?,F(xiàn)在,如果生產(chǎn)服務(wù)器由于硬件故障或軟件崩潰而關(guān)閉,ZooKeeper客戶端進程就會被終止,導(dǎo)致服務(wù)器和ZooKeeper服務(wù)之間的會話被終止。 由于ephemeral znode的屬性唯一,每當(dāng)客戶端連接關(guān)閉時,ZooKeeper服務(wù)會自動刪除路徑/Members中的znode。路徑中znode的刪除引發(fā)了NodeChildrenChanged事件,因此云控制器中的觀察器進程會收到通知。 通過調(diào)用路徑/Members中的getChildren方法,可以確定哪個服務(wù)器節(jié)點已經(jīng)關(guān)閉。然后,控制器節(jié)點可以采取適當(dāng)?shù)拇胧?,比如?zhí)行恢復(fù)邏輯以重啟另一臺服務(wù)器中的故障服務(wù)。這個邏輯可以構(gòu)建為實時工作,保證接近于零停機的時間和高度可用的服務(wù)。
為實現(xiàn)這個集群監(jiān)控模型,我們將開發(fā)兩個Java類。 ClusterMonitor類將持續(xù)運行監(jiān)視器,以監(jiān)視ZooKeeper樹中的路徑/Members。 處理完引發(fā)事件后,我們將在控制臺中打印znode列表并重置監(jiān)視。 另一個類ClusterClient將啟動到ZooKeeper服務(wù)器的連接,在/Members下創(chuàng)建一個ephemeral znode。
要模擬具有多個節(jié)點的集群,我們在同一臺計算機上啟動多個客戶端,并使用客戶端進程的進程ID創(chuàng)建ephemeral znode。 通過查看進程標(biāo)識,ClusterMonitor類可以確定哪個客戶進程已經(jīng)關(guān)閉,哪些進程還在。 在實際情況中,客戶端進程通常會使用當(dāng)前正在運行的服務(wù)器的主機名創(chuàng)建ephemeral znode。 下面顯示了這兩個類的源代碼。
ClusterMonitor.java類定義如下:
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterMonitor implements Runnable {
private static String membershipRoot = "/Members";
private final Watcher connectionWatcher;
private final Watcher childrenWatcher;
private ZooKeeper zk;
boolean alive=true;
public ClusterMonitor(String HostPort) throws IOException, InterruptedException, KeeperException {
connectionWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getType()==Watcher.Event.EventType.None && event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.printf("\nEvent Received: %s", event.toString());
}
}
};
childrenWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
//Get current list of child znode,
//reset the watch
List<String> children = zk.getChildren( membershipRoot, this);
wall("!!!Cluster Membership Change!!!");
wall("Members: " + children);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
alive = false;
throw new RuntimeException(e);
}
}
}
};
zk = new ZooKeeper(HostPort, 2000, connectionWatcher);
// Ensure the parent znode exists
if(zk.exists(membershipRoot, false) == null) {
zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// Set a watch on the parent znode
List<String> children = zk.getChildren(membershipRoot, childrenWatcher);
System.err.println("Members: " + children);
}
public synchronized void close() {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void wall (String message) {
System.out.printf("\nMESSAGE: %s", message);
}
public void run() {
try {
synchronized (this) {
while (alive) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
this.close();
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
if (args.length != 1) {
System.err.println("Usage: ClusterMonitor <Host:Port>");
System.exit(0);
}
String hostPort = args[0];
new ClusterMonitor(hostPort).run();
}
}
ClusterClient.java類定義如下:
import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterClient implements Watcher, Runnable {
private static String membershipRoot = "/Members";
ZooKeeper zk;
public ClusterClient(String hostPort, Long pid) {
String processId = pid.toString();
try {
zk = new ZooKeeper(hostPort, 2000, this);
} catch (IOException e) {
e.printStackTrace();
}
if (zk != null) {
try {
zk.create(membershipRoot + '/' + processId, processId.getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (
KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void close() {
try {
zk.close();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
}
public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
this.close();
}
}
public static void main(String[] args) {
if (args.length != 1) {
System.err.println("Usage: ClusterClient <Host:Port>");
System.exit(0);
}
String hostPort = args[0];
//Get the process id
String name = ManagementFactory.getRuntimeMXBean().getName();
int index = name.indexOf('@');
Long processId = Long.parseLong(name.substring(0, index));
new ClusterClient(hostPort, processId).run();
}
}
使用下面命令編譯這兩個類:
$ javac -cp $CLASSPATH ClusterMonitor.java $ javac -cp $CLASSPATH ClusterClient.java
要執(zhí)行群集監(jiān)控模型,打開兩個終端。 在其中一個終端中,運行ClusterMonitor類。 在另一個終端中,通過在后臺運行ClusterClient類來執(zhí)行多個實例。
在第一個終端中,執(zhí)行ClusterMonitor類:
$ java -cp $CLASSPATH ClusterMonitorlocalhost:2181
如前面的示例所示,看到來自客戶端API的調(diào)試日志消息,最后,ClusterMonitor類開始監(jiān)視事件,輸入如下內(nèi)容:

現(xiàn)在,執(zhí)行ClusterClient類的五個實例來模擬一個集群的五個節(jié)點。ClusterClient在ZooKeeper樹的/Members路徑中使用自己的進程ID創(chuàng)建ephemeral znode:
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null & [1] 4028 $ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null & [2] 4045 $ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null & [3] 4057 $ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null & [4] 4072 $ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null & [5] 4084
與此相對應(yīng),將觀察到ClusterMonitor類檢測到這些新的ClusterClient類實例,因為它正在監(jiān)視ZooKeeper樹的/Members路徑上的事件。 這模擬了一個真正的集群中的節(jié)點加入事件。 可以在ClusterMonitor類的終端中看到輸出,這與下面的截圖中顯示的類似:

現(xiàn)在,如果殺死一個ClusterClient.java進程,那么它與ZooKeeper服務(wù)器一起維護的會話將被終止。因此,客戶端創(chuàng)建的ephemeral znode將被刪除。刪除將觸發(fā)NodeChildrenChanged事件,該事件將被ClusterMonitor類捕獲。該模擬在集群中一個節(jié)點離開的場景。
讓我們用ID 4084終止ClusterClient進程:
$ kill -9 4084
以下屏幕截圖顯示了ClusterMonitor類的終端中的輸出。 它列出了當(dāng)前可用的進程及其進程ID,這些進程ID模擬了實時服務(wù)器:

上面的簡單而優(yōu)雅的集群監(jiān)控模型的示例實現(xiàn)展示了ZooKeeper的真正威力。 在沒有ZooKeeper的情況下,開發(fā)這樣一個能夠?qū)崟r監(jiān)控節(jié)點活躍度的模型將是一項真正的艱巨任務(wù)。
相關(guān)文章
java后端返回數(shù)據(jù)給前端時去除值為空或NULL的屬性、忽略某些屬性代碼示例
在Java開發(fā)中我們處理JSON數(shù)據(jù)時經(jīng)常會遇到空值(null)的情況,這篇文章主要給大家介紹了關(guān)于java后端返回數(shù)據(jù)給前端時去除值為空或NULL的屬性、忽略某些屬性的相關(guān)資料,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-07-07
SpringMvc @RequestParam 使用推薦使用包裝類型代替包裝類型
這篇文章主要介紹了SpringMvc @RequestParam 使用推薦使用包裝類型代替包裝類型,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-02-02
解決Intellij IDEA 使用Spring-boot-devTools無效的問題
下面小編就為大家?guī)硪黄鉀QIntellij IDEA 使用Spring-boot-devTools無效的問題。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-07-07

