Java實(shí)現(xiàn)ZooKeeper的zNode監(jiān)控
上一篇文章已經(jīng)完成了ZooKeeper的基本搭建和使用的介紹,現(xiàn)在開始用代碼說話。參考 https://zookeeper.apache.org/doc/current/javaExample.html ,但對場景和代碼都做了簡化,只實(shí)現(xiàn)基本的Watcher功能。
1 場景設(shè)計(jì)
目的是體驗(yàn)ZooKeeper的Watcher功能。程序監(jiān)控ZooKeeper的/watcher節(jié)點(diǎn)數(shù)據(jù)變化,當(dāng)創(chuàng)建或修改數(shù)據(jù)時(shí),控制臺打印當(dāng)前的數(shù)據(jù)內(nèi)容和版本號;當(dāng)/watcher被刪除時(shí),程序退出。
/watcher的創(chuàng)建、修改和刪除操作,使用控制臺或zkui操作。
2 搭建Maven項(xiàng)目
代碼相對比較簡單,就不用SpringBoot這個(gè)大殺器了,使用一個(gè)普通的Maven項(xiàng)目即可。
ZooKeeper客戶端使用官方提供的Java庫,org.apache.zookeeper: zookeeper: 3.5.5。日志框架使用習(xí)慣的slf4j+log4j2,ZooKeeper缺省使用的是log4j V1,因此在引入的時(shí)候需要excludes。另外,使用了lombok來簡化一些代碼。
以下是pom.xml的內(nèi)容
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>tech.codestory.research</groupId>
<artifactId>zookeeper</artifactId>
<version>1.0.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-web</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.28</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-ext</artifactId>
<version>1.7.28</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
3 log4j2.xml
在項(xiàng)目的 src/main/resources 下創(chuàng)建一個(gè)文件 log4j2.xml,內(nèi)容為
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="DEBUG" name="codeStoryLogger">
<appenders>
<Console name="stdout">
<ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%d{HH:mm:ss:SSS} [%p] - %c{1}.%M(%L) - %m%n"/>
</Console>
</appenders>
<ThresholdFilter level="TRACE"/>
<loggers>
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" additivity="false">
<appender-ref ref="stdout" />
</logger>
<logger name="org.apache.zookeeper" level="TRACE" additivity="false">
<appender-ref ref="stdout"/>
</logger>
<logger name="tech.codestory" level="TRACE" additivity="false">
<appender-ref ref="stdout"/>
</logger>
<root level="WARN">
<appender-ref ref="stdout"/>
</root>
</loggers>
</configuration>
4 創(chuàng)建ZooKeeper連接
創(chuàng)建連接代碼比較簡單,只需要?jiǎng)?chuàng)建 ZooKeeper對象就行,
ZooKeeper構(gòu)造函數(shù)的定義
/** * 創(chuàng)建一個(gè) ZooKeeper 客戶端對象 * @param connectString 逗號分隔的 host:port 字符串, * 單點(diǎn)如 127.0.0.1:2181, * 集群如 192.168.5.128:2181,192.168.5.129:2181,192.168.5.130:2181, * 還可以指定根節(jié)點(diǎn),如 127.0.0.1:2181/foo/bar * @param sessionTimeout 毫秒為單位的超時(shí)時(shí)間 * @param watcher Watcher對象,用于接收 MatcherEvent * @throws IOException 網(wǎng)絡(luò)錯(cuò)誤時(shí)拋出異常 * @throws IllegalArgumentException 如果Root路徑設(shè)置錯(cuò)誤 */ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException;
寫一段測試代碼,創(chuàng)建zk對象后判斷某一個(gè)zNode是否存在。
public class ZooKeeperWatcher implements Watcher {
/** ZooKeeper的客戶端連接 */
ZooKeeper zk;
public ZooKeeperWatcher(String hostPort, String zNode) throws KeeperException, IOException {
zk = new ZooKeeper(hostPort, 3000, this);
try {
Stat exists = zk.exists(zNode, true);
if(exists == null){
log.info(“{} 不存在”, zNode)
}
} catch (InterruptedException e) {
log.error("InterruptedException", e);
}
}
}
運(yùn)行這段代碼,發(fā)現(xiàn)會拋異常
java.net.SocketException: Socket is not connected
at sun.nio.ch.Net.translateToSocketException(Net.java:162) ~[?:?]
at sun.nio.ch.Net.translateException(Net.java:196) ~[?:?]
at sun.nio.ch.Net.translateException(Net.java:202) ~[?:?]
at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:400) ~[?:?]
at org.apache.zookeeper.ClientCnxnSocketNIO.cleanup(ClientCnxnSocketNIO.java:198) [zookeeper-3.5.5.jar:3.5.5]
at org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1338) [zookeeper-3.5.5.jar:3.5.5]
at org.apache.zookeeper.ClientCnxn$SendThread.cleanAndNotifyState(ClientCnxn.java:1276) [zookeeper-3.5.5.jar:3.5.5]
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1254) [zookeeper-3.5.5.jar:3.5.5]
Caused by: java.nio.channels.NotYetConnectedException
at sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:917) ~[?:?]
at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:398) ~[?:?]
... 4 more
NotYetConnectedException的字面意思是連接還沒有創(chuàng)建好,網(wǎng)絡(luò)搜索了一下,建立連接需要一些時(shí)間,創(chuàng)建zk對象后馬上調(diào)用exists命令,這時(shí)候連接還沒有創(chuàng)建好,就會拋異常。ZooKeeper在連接建立成功之后,會發(fā)送一個(gè)WatchedEvent事件,我們可以利用這個(gè)事件完成建立連接的過程。修改后的代碼如下,順便添加了slf4j-ext中的Profiler,用于記錄所消耗的時(shí)間。
public class ZooKeeperWatcher implements Watcher {
/** 等待連接建立成功的信號 */
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
/** ZooKeeper的客戶端連接 */
ZooKeeper zk;
public ZooKeeperWatcher(String hostPort, String zNode) throws KeeperException, IOException {
Profiler profiler = new Profiler("連接到ZooKeeper");
profiler.start("開始鏈接");
zk = new ZooKeeper(hostPort, 3000, this);
try {
profiler.start("等待連接成功的Event");
connectedSemaphore.await();
Stat exists = zk.exists(zNode, true);
if(exists == null){
log.info(“{} 不存在”, zNode)
}
} catch (InterruptedException e) {
log.error("InterruptedException", e);
}
profiler.stop();
profiler.setLogger(log);
profiler.log();
}
/** 收到ZooKeeper的WatchedEvent */
@Override
public void process(WatchedEvent event) {
log.info("event = {}", event);
if (Event.EventType.None.equals(event.getType())) {
// 連接狀態(tài)發(fā)生變化
if (Event.KeeperState.SyncConnected.equals(event.getState())) {
// 連接建立成功
connectedSemaphore.countDown();
}
}
}
}
修改代碼之后的執(zhí)行記錄日志如下,可以看到等待連接成功的Event耗時(shí)9秒多。網(wǎng)絡(luò)上有文章說關(guān)閉防火墻可以秒連,但我測試過,沒發(fā)現(xiàn)有什么變化,使用systemctl stop firewalld之后重新執(zhí)行程序,仍然需要9秒多。
[INFO] - ZooKeeperWatcher.process(61) - event = WatchedEvent state:SyncConnected type:None path:null [DEBUG] - ZooKeeperWatcher.log(201) - + Profiler [連接到ZooKeeper] |-- elapsed time [開始鏈接] 78.912 milliseconds. |-- elapsed time [等待連接成功的Event] 9330.606 milliseconds. |-- Total [連接到ZooKeeper] 9409.926 milliseconds. [INFO] - ZooKeeperWatcher.readNodeData(95) - /watcher 不存在
5 讀取WatchedEvent
前面的代碼,只是處理了建立連接成功時(shí)的Event,下面再來看看讀取數(shù)據(jù)的過程。關(guān)鍵代碼如下:
if (Event.EventType.NodeDataChanged.equals(event.getType())
|| Event.EventType.NodeCreated.equals(event.getType())) {
String path = event.getPath();
if (path != null && path.equals(zNode)) {
// 節(jié)點(diǎn)數(shù)據(jù)被修改
readNodeData();
}
}
/** 讀節(jié)點(diǎn)數(shù)據(jù) */
private void readNodeData() {
try {
Stat stat = new Stat();
byte[] data = zk.getData(zNode, true, stat);
if (data != null) {
log.info("{}, value={}, version={}", zNode, new String(data), stat.getVersion());
}
} catch (KeeperException e) {
log.info("{} 不存在", zNode);
} catch (InterruptedException e) {
log.error("InterruptedException", e);
}
}
當(dāng)接收到創(chuàng)建節(jié)點(diǎn)和修改節(jié)點(diǎn)的WatchedEvent,都會將數(shù)據(jù)讀出并打印在控制臺。
6 調(diào)整后的完整程序清單
對前面的代碼做了部分調(diào)整,同時(shí)添加了退出系統(tǒng)的機(jī)制:節(jié)點(diǎn)被刪除。
package tech.codestory.zookeeper.watcher;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.profiler.Profiler;
import lombok.extern.slf4j.Slf4j;
/**
* 用于測試 ZooKeeper的 WatchedEvent用法
* @author code story
* @date 2019/8/13
*/
@Slf4j
public class ZooKeeperWatcher implements Watcher, Runnable {
/** 等待連接建立成功的信號 */
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
/** 退出系統(tǒng)的信號 */
static Integer quitSemaphore = Integer.valueOf(-1);
String zNode;
ZooKeeper zk;
public ZooKeeperWatcher(String hostPort, String zNode) throws KeeperException, IOException {
this.zNode = zNode;
Profiler profiler = new Profiler("連接到ZooKeeper");
profiler.start("開始鏈接");
zk = new ZooKeeper(hostPort, 3000, this);
try {
profiler.start("等待連接成功的Event");
connectedSemaphore.await();
} catch (InterruptedException e) {
log.error("InterruptedException", e);
}
profiler.stop();
profiler.setLogger(log);
profiler.log();
// 先讀當(dāng)前的數(shù)據(jù)
readNodeData();
}
/** 收到ZooKeeper的WatchedEvent */
@Override
public void process(WatchedEvent event) {
log.info("event = {}", event);
if (Event.EventType.None.equals(event.getType())) {
// 連接狀態(tài)發(fā)生變化
if (Event.KeeperState.SyncConnected.equals(event.getState())) {
// 連接建立成功
connectedSemaphore.countDown();
}
} else if (Event.EventType.NodeDataChanged.equals(event.getType())
|| Event.EventType.NodeCreated.equals(event.getType())) {
String path = event.getPath();
if (path != null && path.equals(zNode)) {
// 節(jié)點(diǎn)數(shù)據(jù)被修改
readNodeData();
}
} else if (Event.EventType.NodeDeleted.equals(event.getType())) {
String path = event.getPath();
if (path != null && path.equals(zNode)) {
synchronized (quitSemaphore) {
// 節(jié)點(diǎn)被刪除,通知退出線程
quitSemaphore.notify();
}
}
}
}
/** 讀節(jié)點(diǎn)數(shù)據(jù) */
private void readNodeData() {
try {
Stat stat = new Stat();
byte[] data = zk.getData(zNode, true, stat);
if (data != null) {
log.info("{}, value={}, version={}", zNode, new String(data), stat.getVersion());
}
} catch (KeeperException e) {
log.info("{} 不存在", zNode);
try {
// 目的是添加Watcher
zk.exists(zNode, true);
} catch (KeeperException ex) {
} catch (InterruptedException ex) {
}
} catch (InterruptedException e) {
log.error("InterruptedException", e);
}
}
@Override
public void run() {
try {
synchronized (quitSemaphore) {
quitSemaphore.wait();
log.info("{} 被刪除,退出", zNode);
}
} catch (InterruptedException e) {
log.error("InterruptedException", e);
}
}
/** 主程序,代碼中寫死了Server地址和zNode名,也可以改成從args中讀取 */
public static void main(String[] args) {
String hostPort = "192.168.5.128:2181";
String zNode = "/watcher";
try {
new ZooKeeperWatcher(hostPort, zNode).run();
} catch (Exception e) {
log.error("new ZooKeeperExecutor()", e);
}
}
}
做一個(gè)測試,應(yīng)用啟動(dòng)后創(chuàng)建節(jié)點(diǎn),修改多次節(jié)點(diǎn),最后刪除節(jié)點(diǎn),日志輸出如下:
10:13:31:979 [INFO] - ZooKeeperWatcher.process(50) - event = WatchedEvent state:SyncConnected type:None path:null 10:13:31:982 [DEBUG] - ZooKeeperWatcher.log(201) - + Profiler [連接到ZooKeeper] |-- elapsed time [開始鏈接] 210.193 milliseconds. |-- elapsed time [等待連接成功的Event] 9385.467 milliseconds. |-- Total [連接到ZooKeeper] 9596.196 milliseconds. 10:13:31:996 [INFO] - ZooKeeperWatcher.readNodeData(84) - /watcher 不存在 10:15:43:451 [INFO] - ZooKeeperWatcher.process(50) - event = WatchedEvent state:SyncConnected type:NodeCreated path:/watcher 10:15:43:463 [INFO] - ZooKeeperWatcher.readNodeData(81) - /watcher, value=hello zk 00, version=0 10:15:50:906 [INFO] - ZooKeeperWatcher.process(50) - event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/watcher 10:15:50:910 [INFO] - ZooKeeperWatcher.readNodeData(81) - /watcher, value=hello zk 01, version=1 10:15:56:004 [INFO] - ZooKeeperWatcher.process(50) - event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/watcher 10:15:56:007 [INFO] - ZooKeeperWatcher.readNodeData(81) - /watcher, value=hello zk 02, version=2 10:16:00:246 [INFO] - ZooKeeperWatcher.process(50) - event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/watcher 10:16:00:249 [INFO] - ZooKeeperWatcher.readNodeData(81) - /watcher, value=hello zk 03, version=3 10:16:06:399 [INFO] - ZooKeeperWatcher.process(50) - event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/watcher 10:16:06:402 [INFO] - ZooKeeperWatcher.readNodeData(81) - /watcher, value=hello zk 10, version=4 10:16:10:217 [INFO] - ZooKeeperWatcher.process(50) - event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/watcher 10:16:10:220 [INFO] - ZooKeeperWatcher.readNodeData(81) - /watcher, value=hello zk 11, version=5 10:16:14:444 [INFO] - ZooKeeperWatcher.process(50) - event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/watcher 10:16:14:447 [INFO] - ZooKeeperWatcher.readNodeData(81) - /watcher, value=hello zk 12, version=6 10:16:20:118 [INFO] - ZooKeeperWatcher.process(50) - event = WatchedEvent state:SyncConnected type:NodeDeleted path:/watcher 10:16:20:118 [INFO] - ZooKeeperWatcher.run(101) - /watcher 被刪除,退出
總結(jié)
以上所述是小編給大家介紹的Java實(shí)現(xiàn)ZooKeeper的zNode監(jiān)控,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時(shí)回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
如果你覺得本文對你有幫助,歡迎轉(zhuǎn)載,煩請注明出處,謝謝!
相關(guān)文章
SpringBoot項(xiàng)目找不到接口報(bào)404錯(cuò)誤的解決辦法
寫了一個(gè)簡單的springboot項(xiàng)目,在啟動(dòng)的時(shí)候idea未報(bào)錯(cuò),瀏覽器訪問接口時(shí)報(bào)404的錯(cuò)誤,所以本文給大家介紹了SpringBoot項(xiàng)目找不到接口報(bào)404錯(cuò)誤的解決辦法,文中有相關(guān)的圖文供大家參考,需要的朋友可以參考下2024-12-12
java多線程Thread的實(shí)現(xiàn)方法代碼詳解
這篇文章主要介紹了java多線程Thread的實(shí)現(xiàn)方法代碼詳解,涉及start(),run(),stop(),interrupt(),isInterrupted(),join()和join(long millis)等方法的介紹,具有一定借鑒價(jià)值,需要的朋友可以了解下。2017-11-11
解決@Api注解不展示controller內(nèi)容的問題
這篇文章主要介紹了解決@Api注解不展示controller內(nèi)容的問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。2022-01-01
SpringBoot使用Flyway進(jìn)行數(shù)據(jù)庫遷移的實(shí)現(xiàn)示例
Flyway是一個(gè)數(shù)據(jù)庫遷移工具,它提供遷移歷史和回滾的功能,本文主要介紹了如何使用Flyway來管理Spring Boot應(yīng)用程序中的SQL數(shù)據(jù)庫架構(gòu),感興趣的可以了解一下2023-08-08
Nacos動(dòng)態(tài)配置管理機(jī)制方式
這篇文章主要介紹了Nacos動(dòng)態(tài)配置管理機(jī)制方式,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07

