ZooKeeper開(kāi)發(fā)實(shí)際應(yīng)用案例實(shí)戰(zhàn)
ZooKeeper入門教程二在單機(jī)和集群環(huán)境下的安裝搭建及使用
ZooKeeper入門教程三分布式鎖實(shí)現(xiàn)及完整運(yùn)行源碼
ZooKeeper框架教程Curator分布式鎖實(shí)現(xiàn)及源碼分析
前面幾章,我們學(xué)習(xí)了zookeeper的概念和使用,并且分析了curator通過(guò)zookeeper實(shí)現(xiàn)分布式鎖的源代碼,我們已經(jīng)熟知zookeeper協(xié)調(diào)分布式系統(tǒng)的方式,相信大家一定會(huì)思考自己的項(xiàng)目場(chǎng)景中是否有zookeeper的用武之地。沒(méi)錯(cuò),我們學(xué)習(xí)的最終目的是要去應(yīng)用它。本章,我通過(guò)實(shí)際工作中的一個(gè)例子,講解zookeeper是如何幫我解決分布式問(wèn)題,以此引導(dǎo)大家發(fā)現(xiàn)自己系統(tǒng)中可以應(yīng)用zookeeper的場(chǎng)景。真正把zookeeper使用起來(lái)!
項(xiàng)目背景介紹
首先給大家介紹一下本文描述項(xiàng)目的情況。這是一個(gè)檢索網(wǎng)站,它讓你能在幾千萬(wàn)份復(fù)雜文檔數(shù)據(jù)中檢索出你所需要的文檔數(shù)據(jù)。為了加快檢索速度,項(xiàng)目的數(shù)據(jù)分布在100臺(tái)機(jī)器的內(nèi)存里,我們稱之為數(shù)據(jù)服務(wù)器。除了數(shù)據(jù),這100臺(tái)機(jī)器上均部署著檢索程序。這些server之外,還有數(shù)臺(tái)給前端提供接口的搜索server,這些機(jī)器屬一個(gè)集群,我們稱之為檢索服務(wù)器。當(dāng)搜索請(qǐng)求過(guò)來(lái)時(shí),他們負(fù)責(zé)把搜索請(qǐng)求轉(zhuǎn)發(fā)到那100臺(tái)機(jī)器,待所有機(jī)器返回結(jié)果后進(jìn)行合并,最終返回給前端頁(yè)面。結(jié)構(gòu)如下圖:

面臨問(wèn)題
網(wǎng)站上線之初,由于數(shù)據(jù)只有幾百萬(wàn),所以數(shù)據(jù)服務(wù)器只有10多臺(tái)。是一個(gè)規(guī)模比較小的分布式系統(tǒng),當(dāng)時(shí)沒(méi)有做分布式系統(tǒng)的協(xié)調(diào),也能正常工作,偶爾出問(wèn)題,馬上解決。但是到了近期,機(jī)器增長(zhǎng)到100臺(tái),網(wǎng)站幾乎每天都會(huì)出現(xiàn)問(wèn)題,導(dǎo)致整個(gè)分布式系統(tǒng)掛掉。問(wèn)題原因如下:
數(shù)據(jù)服務(wù)器之前沒(méi)有做分布式協(xié)調(diào)。對(duì)于檢索服務(wù)器來(lái)說(shuō),并不知道哪些數(shù)據(jù)服務(wù)器還存活,所以檢索服務(wù)器每次檢索,都會(huì)等待100臺(tái)機(jī)器返回結(jié)果。但假如100臺(tái)數(shù)據(jù)服務(wù)中某一臺(tái)死掉了,檢索服務(wù)器也會(huì)長(zhǎng)時(shí)間等待他的返回。這導(dǎo)致了檢索服務(wù)器積累了大量的請(qǐng)求,最終被壓垮。當(dāng)所有的檢索服務(wù)器都被壓垮時(shí),那么網(wǎng)站也就徹底不可用了。

問(wèn)題的本質(zhì)為檢索服務(wù)器維護(hù)的數(shù)據(jù)服務(wù)器列表是靜態(tài)不變的,不能感知數(shù)據(jù)服務(wù)器的上下線。

在10臺(tái)數(shù)據(jù)服務(wù)器的時(shí)候,某一臺(tái)機(jī)器出問(wèn)題的概率很小。但當(dāng)增長(zhǎng)到100臺(tái)服務(wù)器時(shí),出問(wèn)題的概率變成了10倍。所以才會(huì)導(dǎo)致網(wǎng)站幾乎每天都要死掉一次。
由于一臺(tái)機(jī)器的問(wèn)題,導(dǎo)致100臺(tái)機(jī)器的分布式系統(tǒng)不可用,這是極其不合理,也是無(wú)法忍受的。
之前此項(xiàng)目的數(shù)據(jù)和檢索不由我負(fù)責(zé)。了解到此問(wèn)題的時(shí)候,我覺(jué)得這個(gè)問(wèn)題得立刻解決,否則不但用戶體驗(yàn)差,而且開(kāi)發(fā)和運(yùn)維也要每天疲于系統(tǒng)維護(hù),浪費(fèi)了大量資源,但由于還有很多新的需求在開(kāi)發(fā),原來(lái)的團(tuán)隊(duì)也沒(méi)時(shí)間去處理。今年我有機(jī)會(huì)來(lái)解決這個(gè)問(wèn)題,當(dāng)時(shí)正好剛剛研究完zookeeper,立刻想到這正是采用zookeeper的典型場(chǎng)景。
如何解決
我直接說(shuō)方案,程序分為數(shù)據(jù)服務(wù)器和檢索服務(wù)器兩部分。
數(shù)據(jù)服務(wù)器:
1、每臺(tái)數(shù)據(jù)服務(wù)器啟動(dòng)時(shí)候以臨時(shí)節(jié)點(diǎn)的形式把自己注冊(cè)到zookeeper的某節(jié)點(diǎn)下,如/data_servers。這樣當(dāng)某數(shù)據(jù)服務(wù)器死掉時(shí),session斷開(kāi)鏈接,該節(jié)點(diǎn)被刪除。
檢索服務(wù)器:
1、啟動(dòng)時(shí),加載/data_servers下所有子節(jié)點(diǎn)數(shù)據(jù),獲取了目前所有能提供服務(wù)的數(shù)據(jù)服務(wù)器列表,并且加載到內(nèi)存中。
2、啟動(dòng)時(shí),同時(shí)監(jiān)聽(tīng)/data_servers節(jié)點(diǎn),當(dāng)新的數(shù)據(jù)server上線或者某個(gè)server下線時(shí),獲得通知,然后重新加載/data_servers下所有子節(jié)點(diǎn)數(shù)據(jù),刷新內(nèi)存中數(shù)據(jù)服務(wù)器列表。
通過(guò)以上方案,做到數(shù)據(jù)服務(wù)器上下線時(shí),檢索服務(wù)器能夠動(dòng)態(tài)感知。檢索服務(wù)器在檢索前,從內(nèi)存中取得的數(shù)據(jù)服務(wù)器列表將是最新的、可用的。即使在刷新時(shí)間差內(nèi)取到了掉線的數(shù)據(jù)服務(wù)器也沒(méi)關(guān)系,最多影響本次查詢,而不會(huì)拖垮整個(gè)集群。見(jiàn)下圖:

代碼講解
捋清思路后,其實(shí)代碼就比較簡(jiǎn)單了。數(shù)據(jù)服務(wù)器只需要啟動(dòng)的時(shí)候?qū)憐ookeeper臨時(shí)節(jié)點(diǎn)就好了,同時(shí)寫入自己服務(wù)器的相關(guān)信息,比如ip、port之類。檢索無(wú)服務(wù)器端會(huì)稍微復(fù)雜點(diǎn),不過(guò)此處場(chǎng)景和zookeeper官方給的例子十分符合,所以我直接參考官方例子進(jìn)行修改,實(shí)現(xiàn)起來(lái)也很簡(jiǎn)單。關(guān)于官方例子我寫過(guò)兩篇博文,可以參考學(xué)習(xí):
zookeeper官方例子翻譯:ZooKeeper官方文檔之Java客戶端開(kāi)發(fā)案例翻譯
zookeeper官方例子解讀:ZooKeeper官方文檔之Java案例解讀
數(shù)據(jù)服務(wù)器
數(shù)據(jù)服務(wù)器程序十分簡(jiǎn)單,只會(huì)做一件事情:?jiǎn)?dòng)的時(shí)候,把自己以臨時(shí)節(jié)點(diǎn)的形式注冊(cè)到zookeeper。一旦服務(wù)器掛掉,zookeeper自動(dòng)刪除臨時(shí)znode。
我們創(chuàng)建ServiceRegister.java實(shí)現(xiàn)Runnable,數(shù)據(jù)服務(wù)啟動(dòng)的時(shí)候,單獨(dú)線程運(yùn)行此代碼,實(shí)現(xiàn)注冊(cè)到zookeeper邏輯。維系和zookeeper的鏈接。
檢索服務(wù)器
檢索服務(wù)器,代碼設(shè)計(jì)完全采用官方案例,所以詳細(xì)的代碼解讀請(qǐng)參考上面提到的兩篇文章,這里只做下簡(jiǎn)述。
代碼有兩個(gè)類DataMonitor和LoadSaidsExecutor。LoadSaidsExecutor是啟動(dòng)入口,他來(lái)啟動(dòng)DataMonitor監(jiān)控zookeeper節(jié)點(diǎn)變化。DataMonitor負(fù)責(zé)監(jiān)控,初次啟動(dòng)和發(fā)現(xiàn)變化時(shí),調(diào)用LoadSaidsExecutor的方法來(lái)加載最新的數(shù)據(jù)服務(wù)器列表信息。
DataMonitor和LoadSaidsExecutor的工作流程如下:

Excutor把自己注冊(cè)為DataMonitor的監(jiān)聽(tīng)
DataMonitor實(shí)現(xiàn)watcher接口,并監(jiān)聽(tīng)znode
znode變化時(shí),觸發(fā)DataMonitor的監(jiān)聽(tīng)
回調(diào)回調(diào)中通過(guò)ZooKeeper.exist() 再次監(jiān)聽(tīng)znode
上一步exist的回調(diào)方法中,調(diào)用監(jiān)聽(tīng)自己的Executor,執(zhí)行業(yè)務(wù)邏輯6
Executor啟新的線程加載數(shù)據(jù)服務(wù)器信息到內(nèi)存中
注意:圖為以前文章配圖。圖里應(yīng)該把6,7步改為文字描述的第6步。
檢索服務(wù)啟動(dòng)的時(shí)候,單獨(dú)線程運(yùn)行LoadSaIdsExecutor。LoadSaIdsExecutor會(huì)阻塞線程,轉(zhuǎn)為事件驅(qū)動(dòng)。
總結(jié)
我們通過(guò)一個(gè)例子,展示了zookeeper在實(shí)際系統(tǒng)中的應(yīng)用,通過(guò)zookeeper解決了分布式系統(tǒng)的問(wèn)題。其實(shí)以上代碼還有很大的優(yōu)化空間。我能想到如下兩點(diǎn):
1、數(shù)據(jù)服務(wù)器會(huì)假死或者變慢,但和zk鏈接還在,并不會(huì)從zk中刪除,但已經(jīng)拖慢了集群的速度。解決此問(wèn)題,我們可以在數(shù)據(jù)服務(wù)器中加入定時(shí)任務(wù),通過(guò)定時(shí)跑真實(shí)業(yè)務(wù)查詢,監(jiān)控服務(wù)器狀態(tài),一旦達(dá)到設(shè)定的紅線閾值,強(qiáng)制下線,而不是等到server徹底死掉。
2、檢索服務(wù)器每個(gè)server都監(jiān)控zookeeper同一個(gè)節(jié)點(diǎn),在節(jié)點(diǎn)變化時(shí)會(huì)出現(xiàn)羊群效應(yīng)。當(dāng)然,檢索服務(wù)器如果數(shù)量不多還好。其實(shí)檢索服務(wù)器應(yīng)該通過(guò)zookeeper做一個(gè)leader選舉,只由leader去監(jiān)控zookeeper節(jié)點(diǎn)變化,更新redis中的數(shù)據(jù)服務(wù)器列表緩存即可。
附:完整代碼
數(shù)據(jù)服務(wù)端代碼
ServiceRegister.java
public class ServiceRegister implements Runnable{
private ZooKeeper zk;
private static final String ZNODE = "/sas";
private static final String SA_NODE_PREFIX = "sa_";
private String hostName="localhost:2181";
public void setHostName(String hostName) {
this.hostName = hostName;
}
public ServiceRegister() throws IOException {
zk = new ZooKeeper(hostName, 10000,null);
}
@Override
public void run() {
try {
createSaNode();
synchronized (this) {
wait();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//測(cè)試用
public static void main(String[] args){
try {
new ServiceRegister().run();
} catch (IOException e) {
e.printStackTrace();
}
}
//創(chuàng)建子節(jié)點(diǎn)
private String createSaNode() throws KeeperException, InterruptedException {
// 如果根節(jié)點(diǎn)不存在,則創(chuàng)建根節(jié)點(diǎn)
Stat stat = zk.exists(ZNODE, false);
if (stat == null) {
zk.create(ZNODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
String hostName = System.getenv("HOSTNAME");
// 創(chuàng)建EPHEMERAL_SEQUENTIAL類型節(jié)點(diǎn)
String saPath = zk.create(ZNODE + "/" + SA_NODE_PREFIX,
hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
return saPath;
}
}檢索服務(wù)端代碼
DataMonitor.java
public class DataMonitor implements Watcher, AsyncCallback.ChildrenCallback {
ZooKeeper zk;
String znode;
Watcher chainedWatcher;
boolean dead;
DataMonitorListener listener;
List<String> prevSaIds;
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
this.zk = zk;
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener;
// 這是整個(gè)監(jiān)控的真正開(kāi)始,通過(guò)獲取children節(jié)點(diǎn)開(kāi)始。設(shè)置了本對(duì)象為監(jiān)控對(duì)象,回調(diào)對(duì)象也是本對(duì)象。以后均是事件驅(qū)動(dòng)。
zk.getChildren(znode, true, this, null);
}
/**
* 其他和monitor產(chǎn)生交互的類,需要實(shí)現(xiàn)此listener
*/
public interface DataMonitorListener {
/**
* The existence status of the node has changed.
*/
void changed(List<String> saIds);
/**
* The ZooKeeper session is no longer valid.
*
* @param rc
* the ZooKeeper reason code
*/
void closing(int rc);
}
/*
*監(jiān)控/saids的回調(diào)函數(shù)。除了處理異常外。
*如果發(fā)生變化,和構(gòu)造函數(shù)中一樣,通過(guò)getChildren,再次監(jiān)控,并處理children節(jié)點(diǎn)變化后的業(yè)務(wù)
*/
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(Code.SESSIONEXPIRED.intValue());
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.getChildren(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
//拿到Children節(jié)點(diǎn)后的回調(diào)函數(shù)。
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.getChildren(znode, true, this, null);
return;
}
List<String> saIds = null;
//如果存在,再次查詢到最新children,此時(shí)僅查詢,不要設(shè)置監(jiān)控了
if (exists) {
try {
saIds = zk.getChildren(znode,null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
//拿到最新saids后,通過(guò)listener(executor),加載Saids。
if ((saIds == null && saIds != prevSaIds)
|| (saIds != null && !saIds.equals(prevSaIds))) {
listener.changed(saIds);
prevSaIds = saIds;
}
}
}LoadSaIdsExecutor.java
public class LoadSaIdsExecutor
implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
private DataMonitor dm;
private ZooKeeper zk;
private static final String znode = "/sas";
private String hostName="localhost:2181";
public void setHostName(String hostName) {
this.hostName = hostName;
}
/*
*初始化zookeeper及DataMonitor
* 自己作為zookeeper的監(jiān)控者,監(jiān)控和zookeeper連接的變化
* 自己作為DataMonitor的listener。當(dāng)dm監(jiān)控到變化時(shí)會(huì)調(diào)用executor執(zhí)行業(yè)務(wù)操作
*/
public LoadSaIdsExecutor() throws KeeperException, IOException {
zk = new ZooKeeper(hostName, 300000, this);
dm = new DataMonitor(zk, znode, null, this);
}
/**
* 入口方法,測(cè)試用。
*/
public static void main(String[] args) {
try {
new LoadSaIdsExecutor().run();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 作為單獨(dú)線程運(yùn)行
*/
public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
}
/*
*作為zookeeper監(jiān)控者的回調(diào),直接傳遞事件給monitor的回調(diào)函數(shù)統(tǒng)一處理
*/
@Override
public void process(WatchedEvent event) {
dm.process(event);
}
/*
*當(dāng)關(guān)閉時(shí),讓線程線繼續(xù)走完
*/
public void closing(int rc) {
synchronized (this) {
notifyAll();
}
}
/*
*監(jiān)控到/saids變化后的處理類
*/
static class SaIdsLoader extends Thread {
List<String> saIds = null;
//構(gòu)造對(duì)象后直接啟動(dòng)線程
public SaIdsLoader(List<String> saIds){
this.saIds = saIds;
start();
}
public void run() {
System.out.println("------------加載開(kāi)始------------");
//業(yè)務(wù)處理的地方
if(saIds!=null){
saIds.forEach(id->{
System.out.println(id);
});
}
System.out.println("------------加載結(jié)束------------");
}
}
/*
*作為listener對(duì)外暴露的方法,在節(jié)點(diǎn)/saids變化時(shí)被調(diào)用。
*/
@Override
public void changed(List<String> data) {
new SaIdsLoader(data);
}
}以上就是ZooKeeper開(kāi)發(fā)實(shí)際應(yīng)用案例實(shí)戰(zhàn)的詳細(xì)內(nèi)容,更多關(guān)于ZooKeeper開(kāi)發(fā)應(yīng)用案例的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring中使用atomikos+druid實(shí)現(xiàn)經(jīng)典分布式事務(wù)的方法
這篇文章主要介紹了Spring中使用atomikos+druid實(shí)現(xiàn)經(jīng)典分布式事務(wù)的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-06-06
java中實(shí)現(xiàn)遞歸計(jì)算二進(jìn)制表示中1的個(gè)數(shù)
這是一個(gè)很有意思的問(wèn)題,是在面試中特別容易被問(wèn)到的問(wèn)題之一,解決這個(gè)問(wèn)題第一想法肯定是一位一位的去判斷,是1計(jì)數(shù)器+1,否則不操作,跳到下一位,十分容易,編程初學(xué)者就可以做得到!2015-05-05
教你如何使用Java8實(shí)現(xiàn)菜單樹(shù)形數(shù)據(jù)
今天給大家?guī)?lái)的是關(guān)于JAVA的相關(guān)知識(shí),文中圍繞著如何使用Java8實(shí)現(xiàn)菜單樹(shù)形數(shù)據(jù)展開(kāi),文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06
spring boot如何實(shí)現(xiàn)切割分片上傳
這篇文章主要介紹了spring boot如何實(shí)現(xiàn)切割分片上傳,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
java?kafka如何動(dòng)態(tài)設(shè)置用戶讀寫權(quán)限
這篇文章主要介紹了java?kafka如何動(dòng)態(tài)設(shè)置用戶讀寫權(quán)限問(wèn)題,具有很好的參考家價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08
只需兩步實(shí)現(xiàn)Eclipse+Maven快速構(gòu)建第一個(gè)Spring Boot項(xiàng)目
這篇文章主要介紹了只需兩步實(shí)現(xiàn)Eclipse+Maven快速構(gòu)建第一個(gè)Spring Boot項(xiàng)目,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-12-12
關(guān)于Selenium的UI自動(dòng)化測(cè)試屏幕截圖功能實(shí)例代碼
今天小編就為大家分享一篇關(guān)于Selenium的UI自動(dòng)化測(cè)試屏幕截圖功能實(shí)例代碼,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-05-05
Spring?boot框架JWT實(shí)現(xiàn)用戶賬戶密碼登錄驗(yàn)證流程
這篇文章主要介紹了Springboot框架JWT實(shí)現(xiàn)用戶賬戶密碼登錄驗(yàn)證,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-06-06
Java中的springboot監(jiān)聽(tīng)事件和處理事件詳解
這篇文章主要介紹了Java中的springboot監(jiān)聽(tīng)事件和處理事件,這個(gè)示例展示了如何在Spring Boot應(yīng)用中定義自定義事件、發(fā)布事件以及監(jiān)聽(tīng)事件,需要的朋友可以參考下2024-07-07

