Java Spring Boot 集成Zookeeper
集成步驟
1.pom.xml文件配置,引入相關(guān)jar包
Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊(cè)Watcher和NodeExistsException異常等等。
<!-- 封裝了一些高級(jí)特性,如:Cache事件監(jiān)聽、選舉、分布式鎖、分布式Barrier -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>特殊說(shuō)明: 1.無(wú)需引入curator-framework,因?yàn)閏urator-recipes自動(dòng)關(guān)聯(lián)依賴引入curator-framework。 2.curator會(huì)默認(rèn)引入zookeeper的jar報(bào),需要檢查版本與服務(wù)器的版本是否一致,如果不一致則需要排除引入 3.
2. 核心配置類
@Configuration
public class ZookeeperConfig implements Serializable
{
private static final long serialVersionUID = -9025878246972668136L;
private final ZooKeeperProperty zooKeeperProperty;
public ZookeeperConfig(ZooKeeperProperty zooKeeperProperty) {
this.zooKeeperProperty = zooKeeperProperty;
}
@Bean
public CuratorFramework curatorFramework()
{
RetryPolicy retryPolicy = new ExponentialBackoffRetry(zooKeeperProperty.getBaseSleepTime(),
zooKeeperProperty.getMaxRetries());
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zooKeeperProperty.getServers())
.connectionTimeoutMs(zooKeeperProperty.getConnectionTimeout())
.sessionTimeoutMs(zooKeeperProperty.getSessionTimeout())
.retryPolicy(retryPolicy)
.build();
client.start();
return client;
}
@Bean
@ConditionalOnMissingBean
public ZooKeeperUtils zooKeeperTemplate(CuratorFramework client) {
return new ZooKeeperUtils(client);
}
}
@ConfigurationProperties(prefix="zookeeper")
@Component
public class ZooKeeperProperty implements Serializable
{
private static final long serialVersionUID = 8650758711482699256L;
/**
* zk連接集群,多個(gè)用逗號(hào)隔開
*/
private String servers;
/**
* 會(huì)話超時(shí)時(shí)間
*/
private int sessionTimeout = 60000;
/**
* 連接超時(shí)時(shí)間
*/
private int connectionTimeout = 15000;
/**
* 初始重試等待時(shí)間(毫秒)
*/
private int baseSleepTime = 1000;
/**
* 重試最大次數(shù)
*/
private int maxRetries = 10;
//省略get、set方法
......
}3.常用API功能
@Component
public class ZooKeeperUtils
{
private static final Logger logger = LoggerFactory
.getLogger(ZooKeeperUtils.class);
/**
* 路徑分隔符
*/
private static final String PATH_SEPARATOR = "/";
/**
* zk連接
*/
private final CuratorFramework client;
public ZooKeeperUtils(CuratorFramework client)
{
this.client = client;
}
/**
* 創(chuàng)建空節(jié)點(diǎn),默認(rèn)持久節(jié)點(diǎn)
*
* @param path
* 節(jié)點(diǎn)路徑
* @param node
* 節(jié)點(diǎn)名稱
* @return 完整路徑
*/
public String createNode(String path, String node)
{
return createNode(path, node, CreateMode.PERSISTENT);
}
/**
* 創(chuàng)建帶類型的空節(jié)點(diǎn)
*
* @param path
* 節(jié)點(diǎn)路徑
* @param node
* 節(jié)點(diǎn)名稱
* @param createMode
* 類型 CreateMode.PERSISTENT: 創(chuàng)建節(jié)點(diǎn)后,不刪除就永久存在
* CreateMode.PERSISTENT_SEQUENTIAL:節(jié)點(diǎn)path末尾會(huì)追加一個(gè)10位數(shù)的單調(diào)遞增的序列
* CreateMode.EPHEMERAL:創(chuàng)建后,回話結(jié)束節(jié)點(diǎn)會(huì)自動(dòng)刪除
* CreateMode.EPHEMERAL_SEQUENTIAL:節(jié)點(diǎn)path末尾會(huì)追加一個(gè)10位數(shù)的單調(diào)遞增的序列
* @return 路徑
*/
public String createNode(String path, String node, CreateMode createMode)
{
path = buildPath(path, node);
logger.info("create node for path: {} with createMode: {}", path,
createMode.name());
try
{
client.create().creatingParentsIfNeeded().withMode(createMode)
.forPath(path);
logger.info("create node :{} sucessfully", node);
return path;
}
catch (Exception e)
{
logger.error(
"create node for path: {} with createMode: {} failed!",
path, createMode.name(), e);
return null;
}
}
/**
* 創(chuàng)建節(jié)點(diǎn),默認(rèn)持久節(jié)點(diǎn)
*
* @param path
* 節(jié)點(diǎn)路徑
* @param node
* 節(jié)點(diǎn)名稱
* @param value
* 節(jié)點(diǎn)值
* @return 完整路徑
*/
public String createNode(String path, String node, String value)
{
return createNode(path, node, value, CreateMode.PERSISTENT);
}
/**
* 創(chuàng)建節(jié)點(diǎn),默認(rèn)持久節(jié)點(diǎn)
*
* @param path
* 節(jié)點(diǎn)路徑
* @param node
* 節(jié)點(diǎn)名稱
* @param value
* 節(jié)點(diǎn)值
* @param createMode
* 節(jié)點(diǎn)類型
* @return 完整路徑
*/
public String createNode(String path, String node, String value,
CreateMode createMode)
{
if (Objects.isNull(value))
{
logger.error("ZooKeeper節(jié)點(diǎn)值不能為空!");
}
path = buildPath(path, node);
logger.info("create node for path: {}, value: {}, with createMode: {}",
path, value, createMode.name());
try
{
client.create().creatingParentsIfNeeded().withMode(createMode)
.forPath(path, value.getBytes());
return path;
}
catch (Exception e)
{
logger.error(
"create node for path: {}, value: {}, with createMode: {} failed!",
path, value, createMode.name(), e);
}
return null;
}
/**
* 獲取節(jié)點(diǎn)數(shù)據(jù)
*
* @param path
* 路徑
* @param node
* 節(jié)點(diǎn)名稱
* @return 完整路徑
*/
public String get(String path, String node)
{
path = buildPath(path, node);
try
{
byte[] bytes = client.getData().forPath(path);
if (bytes.length > 0)
{
return new String(bytes);
}
}
catch (Exception e)
{
logger.error("get value for path: {}, node: {} failed!", path,
node, e);
}
return null;
}
/**
* 更新節(jié)點(diǎn)數(shù)據(jù)
*
* @param path
* 節(jié)點(diǎn)路徑
* @param node
* 節(jié)點(diǎn)名稱
* @param value
* 更新值
* @return 完整路徑
*/
public String update(String path, String node, String value)
{
if (Objects.isNull(value))
{
logger.error("ZooKeeper節(jié)點(diǎn)值不能為空!");
}
path = buildPath(path, node);
logger.info("update path: {} to value: {}", path, value);
try
{
client.setData().forPath(path, value.getBytes());
return path;
}
catch (Exception e)
{
logger.error("update path: {} to value: {} failed!", path, value);
}
return null;
}
/**
* 刪除節(jié)點(diǎn),并且遞歸刪除子節(jié)點(diǎn)
*
* @param path
* 路徑
* @param node
* 節(jié)點(diǎn)名稱
* @return 路徑
*/
public boolean delete(String path, String node)
{
path = buildPath(path, node);
logger.info("delete node for path: {}", path);
try
{
client.delete().deletingChildrenIfNeeded().forPath(path);
return true;
}
catch (Exception e)
{
logger.error("delete node for path: {} failed!", path);
}
return false;
}
/**
* 獲取子節(jié)點(diǎn)
*
* @param path
* 節(jié)點(diǎn)路徑
* @return
*/
public List<String> getChildren(String path)
{
if (StringUtils.isEmpty(path))
{
return null;
}
if (!path.startsWith(PATH_SEPARATOR))
{
path = PATH_SEPARATOR + path;
}
try
{
return client.getChildren().forPath(path);
}
catch (Exception e)
{
logger.error("get children path:{} error", path, e);
}
return null;
}
/**
* 判斷節(jié)點(diǎn)是否存在
*
* @param path
* 路徑
* @param node
* 節(jié)點(diǎn)名稱
* @return 結(jié)果
*/
public boolean exists(String path, String node)
{
try
{
List<String> list = getChildren(path);
return !CollectionUtils.isEmpty(list) && list.contains(node);
}
catch (Exception e)
{
return false;
}
}
/**
* 申請(qǐng)鎖,指定請(qǐng)求等待時(shí)間
*
* @param path
* 加鎖zk節(jié)點(diǎn)
* @param time
* 時(shí)間
* @param unit
* 時(shí)間單位
* @param runnable
* 執(zhí)行方法
*/
public void lock(String path, long time, TimeUnit unit, Runnable runnable)
{
try
{
InterProcessMutex lock = new InterProcessMutex(client, path);
if (lock.acquire(time, unit))
{
try
{
runnable.run();
}
finally
{
lock.release();
}
}
else
{
logger.error("獲取鎖超時(shí):{}!", path);
}
}
catch (Exception e)
{
logger.error("獲取鎖失敗: {}!", path);
}
}
/**
* 申請(qǐng)鎖,指定請(qǐng)求等待時(shí)間
*
* @param path
* 加鎖zk節(jié)點(diǎn)
* @param time
* 時(shí)間
* @param unit
* 時(shí)間單位
* @param callable
* 執(zhí)行方法
* @return .
*/
public <T> T lock(String path, long time, TimeUnit unit,
Callable<T> callable)
{
try
{
InterProcessMutex lock = new InterProcessMutex(client, path);
if (lock.acquire(time, unit))
{
try
{
return callable.call();
}
finally
{
lock.release();
}
}
else
{
logger.error("獲取鎖超時(shí):{}!", path);
}
}
catch (Exception e)
{
logger.error("獲取鎖失敗: {}!", path);
}
return null;
}
/* *//**
* 對(duì)一個(gè)節(jié)點(diǎn)進(jìn)行監(jiān)聽,監(jiān)聽事件包括指定的路徑節(jié)點(diǎn)的增、刪、改的操作
*
* @param path
* 節(jié)點(diǎn)路徑
* @param listener
* 回調(diào)方法
* @throws Exception
*/
public void watchNode(String path,boolean dataIsCompressed,final ZooKeeperCallback zooKeeperCallback)throws Exception
{
try
{
final NodeCache nodeCache = new NodeCache(client, path,dataIsCompressed);
nodeCache.getListenable().addListener(new NodeCacheListener()
{
public void nodeChanged() throws Exception
{
ChildData childData = nodeCache.getCurrentData();
logger.info("ZNode節(jié)點(diǎn)狀態(tài)改變, path={}", childData.getPath());
logger.info("ZNode節(jié)點(diǎn)狀態(tài)改變, data={}", childData.getData());
logger.info("ZNode節(jié)點(diǎn)狀態(tài)改變, stat={}", childData.getStat());
//處理業(yè)務(wù)邏輯
zooKeeperCallback.call();
}
});
nodeCache.start();
}
catch (Exception e)
{
logger.error("創(chuàng)建NodeCache監(jiān)聽失敗, path={}",path);
}
}
/**
* 對(duì)指定的路徑節(jié)點(diǎn)的一級(jí)子目錄進(jìn)行監(jiān)聽,不對(duì)該節(jié)點(diǎn)的操作進(jìn)行監(jiān)聽,對(duì)其子目錄的節(jié)點(diǎn)進(jìn)行增、刪、改的操作監(jiān)聽
*
* @param path
* 節(jié)點(diǎn)路徑
* @param listener
* 回調(diào)方法
*/
public void watchChildren(String path, PathChildrenCacheListener listener)
{
try
{
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,
path, true);
pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
pathChildrenCache.getListenable().addListener(listener);
}
catch (Exception e)
{
logger.error("watch children failed for path: {}", path, e);
}
}
/**
* 將指定的路徑節(jié)點(diǎn)作為根節(jié)點(diǎn)(祖先節(jié)點(diǎn)),對(duì)其所有的子節(jié)點(diǎn)操作進(jìn)行監(jiān)聽,呈現(xiàn)樹形目錄的監(jiān)聽,可以設(shè)置監(jiān)聽深度,最大監(jiān)聽深度為2147483647(
* int類型的最大值)
*
* @param path
* 節(jié)點(diǎn)路徑
* @param maxDepth
* 回調(diào)方法
* @param listener
* 監(jiān)聽
*/
public void watchTree(String path, int maxDepth, TreeCacheListener listener)
{
try
{
TreeCache treeCache = TreeCache.newBuilder(client, path)
.setMaxDepth(maxDepth).build();
treeCache.start();
treeCache.getListenable().addListener(listener);
}
catch (Exception e)
{
logger.error("watch tree failed for path: {}", path, e);
}
}
public String buildPath(String path, String node)
{
if (StringUtils.isEmpty(path) || StringUtils.isEmpty(node))
{
logger.error("ZooKeeper路徑或者節(jié)點(diǎn)名稱不能為空!");
}
if (!path.startsWith(PATH_SEPARATOR))
{
path = PATH_SEPARATOR + path;
}
if (PATH_SEPARATOR.equals(path))
{
return path + node;
}
else
{
return path + PATH_SEPARATOR + node;
}
}
}4.基本使用
@Autowired
private ZooKeeperUtils zooKeeperUtil;
@RequestMapping("/addNode")
public String addNode()
{
String path= zooKeeperUtil.createNode("/zookeeper", "node1");
return path;
}特殊說(shuō)明:關(guān)于zookeeper的分布式鎖,后續(xù)講解常用分布式鎖的時(shí)候,會(huì)詳細(xì)說(shuō)明。
常見錯(cuò)誤和解決辦法
問題1:調(diào)用api創(chuàng)建zookeeper節(jié)點(diǎn)時(shí),報(bào)KeeperErrorCode = Unimplemented for /test錯(cuò)誤。
原因:服務(wù)器安裝zookeeper的版本與程序中的zookeeper版本不一致。
解決方案: 登錄服務(wù)器,查看zookeeper安裝版本,執(zhí)行如下命令:
echo stat|nc 127.0.0.1 2181

當(dāng)前引入的zookeeper版本為3.4.13,而zookeeper的版本與curator對(duì)應(yīng)關(guān)系如下:
Curator 2.x.x - compatible with both ZooKeeper 3.4.x and ZooKeeper 3.5.x Curator 4.x.x - compatible only with ZooKeeper 3.5.x and includes support for new features such as dynamic reconfiguration, etc. Curator 5.x.x compatible only with ZooKeeper 3.6.x+
問題2:?jiǎn)?dòng)項(xiàng)目的日志中會(huì)有Will not attempt to authenticate using SASL錯(cuò)誤
起初我認(rèn)為是zookeeper需要進(jìn)行SASL認(rèn)證,但是通過(guò)查閱相關(guān)資料后,才知道3.4之前版本,zookeeper默認(rèn)會(huì)采用SASL認(rèn)證,3.4以后的版本沒有此類問題。
到此這篇關(guān)于Java Spring Boot 集成Zookeeper的文章就介紹到這了,更多相關(guān)Spring Boot 集成Zookeeper內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 淺談Java(SpringBoot)基于zookeeper的分布式鎖實(shí)現(xiàn)
- 使用dubbo+zookeeper+spring boot構(gòu)建服務(wù)的方法詳解
- SpringBoot中dubbo+zookeeper實(shí)現(xiàn)分布式開發(fā)的應(yīng)用詳解
- SpringBoot集成Curator實(shí)現(xiàn)Zookeeper基本操作的代碼示例
- SpringBoot系列教程之dubbo和Zookeeper集成方法
- SpringBoot整合Dubbo+Zookeeper實(shí)現(xiàn)RPC調(diào)用
- springboot應(yīng)用訪問zookeeper的流程
- SpringBoot整合Zookeeper詳細(xì)教程
- SpringBoot讀取ZooKeeper(ZK)屬性的方法實(shí)現(xiàn)
相關(guān)文章
Java使用FutureTask實(shí)現(xiàn)預(yù)加載的示例詳解
基于FutureTask的特性,通常可以使用FutureTask做一些預(yù)加載工作,比如一些時(shí)間較長(zhǎng)的計(jì)算等,本文就來(lái)和大家講講具體實(shí)現(xiàn)方法吧,感興趣的可以了解一下2023-06-06
java對(duì)圖片進(jìn)行壓縮和resize縮放的方法
本篇文章主要介紹了java對(duì)圖片進(jìn)行壓縮和resize調(diào)整的方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-07-07
Spring boot2基于Mybatis實(shí)現(xiàn)多表關(guān)聯(lián)查詢
這篇文章主要介紹了Spring boot2基于Mybatis實(shí)現(xiàn)多表關(guān)聯(lián)查詢,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04
java開發(fā)使用StringUtils.split避坑詳解
這篇文章主要為大家介紹了java開發(fā)使用StringUtils.split避坑詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-11-11

