elasticsearch集群cluster?discovery可配式模塊示例分析
前言
elasticsearch cluster實(shí)現(xiàn)了自己發(fā)現(xiàn)機(jī)制zen。Discovery功能主要包括以下幾部分內(nèi)容:master選舉,master錯誤探測,集群中其它節(jié)點(diǎn)探測,單播多播ping。本篇會首先概述以下Discovery這一部分的功能,然后介紹節(jié)點(diǎn)檢測。其它內(nèi)容會在接下來介紹。
Discovery模塊的概述
discovery是可配式模塊,官方支持亞馬遜的Azure discovery,Google Compute Engine,EC2 Discovery三種發(fā)現(xiàn)機(jī)制,根據(jù)插件規(guī)則完全可以自己實(shí)現(xiàn)其它的發(fā)現(xiàn)機(jī)制。整個模塊通過實(shí)現(xiàn)guice的DiscoveryModule對外提供模塊的注冊和啟動, 默認(rèn)使用zen discovery。發(fā)現(xiàn)模塊對外接口為DiscoveryService,它的方法如下所示:

它本質(zhì)上是discovery的一個代理,所有的功能最終都是由所綁定的discovery所實(shí)現(xiàn)的。節(jié)點(diǎn)啟動時(shí)通過DiscoveryModule獲取DiscoveryService,然后啟動DiscoveryService,DiscoveryService啟動綁定的Discovery,整個功能模塊就完成了加載和啟動。這也是elasticsearch所有模塊的實(shí)現(xiàn)方式,通過module對外提供綁定和獲取,通過service接口對外提供模塊的功能,在后面的分析中會經(jīng)常遇到。
cluster節(jié)點(diǎn)探測
接下來分析cluster的一個重要功能就是節(jié)點(diǎn)探測。cluster中不能沒有master節(jié)點(diǎn),因此集群中所有節(jié)點(diǎn)都要周期探測master節(jié)點(diǎn),一旦無法檢測到,將會進(jìn)行master選舉。同時(shí)作為master,對于節(jié)點(diǎn)變動也要時(shí)刻關(guān)注,因此它需要周期性探測集群中所有節(jié)點(diǎn),確保及時(shí)剔除已經(jīng)宕機(jī)的節(jié)點(diǎn)。這種相互間的心跳檢測就是cluster的faultdetection。下圖是faultdetection的繼承關(guān)系:

有兩種實(shí)現(xiàn)方式,分別是master探測集群中其它節(jié)點(diǎn)和其它節(jié)點(diǎn)對master節(jié)點(diǎn)的探測。
FaultDetection只要一個抽象方法handleTransportDisconnect,該方法在內(nèi)部類FDConnectionListener中被調(diào)用。在elasticsearch中大量使用了listener的異步方式,異步可以極大提升系統(tǒng)性能。它的代碼如下所示:
private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node) {
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}faultdetection啟動時(shí)會注冊相應(yīng)的FDConnetionListener,當(dāng)探測到節(jié)點(diǎn)丟失,會通過onNodeDisconnected方法回調(diào)對于的handleTransportDisconnect進(jìn)行處理。
MasterFaultDetection的啟動代碼
private?void?innerStart(final?DiscoveryNode masterNode) {
this.masterNode = masterNode;
this.retryCount = 0;
this.notifiedMasterFailure.set(false);
// 嘗試連接master節(jié)點(diǎn)
try {
transportService.connectToNode(masterNode);
} catch (final Exception e) {
// 連接失敗通知masterNode失敗
notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]");
return;
}
//關(guān)閉之前的masterping,重啟新的masterping
if (masterPinger != null) {
masterPinger.stop();
}
this.masterPinger = new MasterPinger();
// 周期之后啟動masterPing,這里并沒有周期啟動masterPing,只是設(shè)定了延遲時(shí)間。
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
}代碼有有詳細(xì)注釋,就不再過多解釋。
master連接失敗的邏輯
代碼如下:
private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
if (notifiedMasterFailure.compareAndSet(false, true)) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
//通知所有l(wèi)istener master丟失
for (Listener listener : listeners) {
listener.onMasterFailure(masterNode, reason);
}
}
});
stop("master failure, " + reason);
}
}在ZenDiscovery中實(shí)現(xiàn)了listener.onMasterFailure接口。會進(jìn)行master丟失的相關(guān)處理,在后面再分析。
MasterPing的關(guān)鍵代碼
private class MasterPinger implements Runnable {
private volatile boolean running = true;
public void stop() {
this.running = false;
}
@Override
public void run() {
if (!running) {
// return and don't spawn...
return;
}
final DiscoveryNode masterToPing = masterNode;
final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
@Override
public MasterPingResponseResponse newInstance() {
return new MasterPingResponseResponse();
}
@Override
public void handleResponse(MasterPingResponseResponse response) {
if (!running) {
return;
}
// reset the counter, we got a good result
MasterFaultDetection.this.retryCount = 0;
// check if the master node did not get switched on us..., if it did, we simply return with no reschedule
if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
// 啟動新的ping周期
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
}
}
@Override
public void handleException(TransportException exp) {
if (!running) {
return;
}
synchronized (masterNodeMutex) {
// check if the master node did not get switched on us...
if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
handleTransportDisconnect(masterToPing);
return;
} else if (exp.getCause() instanceof NoLongerMasterException) {
logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
notifyMasterFailure(masterToPing, "no longer master");
return;
} else if (exp.getCause() instanceof NotMasterException) {
logger.debug("[master] pinging a master {} that is not the master", masterNode);
notifyMasterFailure(masterToPing, "not master");
return;
} else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode);
notifyMasterFailure(masterToPing, "do not exists on master, act as master failure");
return;
}
int retryCount = ++MasterFaultDetection.this.retryCount;
logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
if (retryCount >= pingRetryCount) {
logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout);
// not good, failure
notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
}
}
}
}
);
}
}MasterPing是一個線程,在innerStart的方法中沒有設(shè)定周期啟動masterping,但是masterping需要周期進(jìn)行,這個秘密就在run 方法中,如果ping成功就會重啟一個新的ping。這樣既保證了ping線程的唯一性同時(shí)也保證了ping的順序和間隔。
ping的方式跟之前一樣是也是通過transport發(fā)送一個masterpingrequest,進(jìn)行一個連接。節(jié)點(diǎn)收到該請求后,如果已不再是master會拋出NotMasterException,狀態(tài)更新出差會拋出其它異常,異常會通過。否則會正常響應(yīng)notifyMasterFailure方法處理跟啟動邏輯一樣。
對于網(wǎng)絡(luò)問題導(dǎo)致的無響應(yīng)情況,會調(diào)用handleTransportDisconnect(masterToPing)方法處理。masterfaultDetection對該方法的實(shí)現(xiàn)如下:
protected void handleTransportDisconnect(DiscoveryNode node) {
//這里需要同步
synchronized (masterNodeMutex) {
//master 已經(jīng)換成其它節(jié)點(diǎn),就沒必要再連接
if (!node.equals(this.masterNode)) {
return;
}
if (connectOnNetworkDisconnect) {
try {
//嘗試再次連接
transportService.connectToNode(node);
// if all is well, make sure we restart the pinger
if (masterPinger != null) {
masterPinger.stop();
}
//連接成功啟動新的masterping
this.masterPinger = new MasterPinger();
// we use schedule with a 0 time value to run the pinger on the pool as it will run on later
threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
} catch (Exception e) {
//連接出現(xiàn)異常,啟動master節(jié)點(diǎn)丟失通知
logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
}
} else {
//不需要重連,通知master丟失。
logger.trace("[master] [{}] transport disconnected", node);
notifyMasterFailure(node, "transport disconnected");
}
}
}這就是masterfaultDetection的整個流程:啟動中如果master丟失則通知節(jié)點(diǎn)丟失,否則在一定延遲(3s)后啟動masterping,masterping線程嘗試連接master節(jié)點(diǎn),如果master節(jié)點(diǎn)網(wǎng)絡(luò)失聯(lián),嘗試再次連接。master節(jié)點(diǎn)收到masterpingrequest后首先看一下自己還是不是master,如果不是則拋出異常,否則正?;貞?yīng)。節(jié)點(diǎn)如果收到響應(yīng)是異常則啟動master丟失通知,否則此次ping結(jié)束。在一定延遲后啟動新的masterping線程。
NodeFaultDetection的邏輯跟實(shí)現(xiàn)上跟MasterFualtDetetion相似,區(qū)別主要在于ping異常處理上。當(dāng)某個節(jié)點(diǎn)出現(xiàn)異?;蛘邲]有響應(yīng)時(shí),會啟動節(jié)點(diǎn)丟失機(jī)制,只是受到通知后的處理邏輯不通。就不再詳細(xì)分析,有興趣可以參考具體代碼,希望大家以后多多支持腳本之家!
相關(guān)文章
Java之Swagger配置掃描接口以及開關(guān)案例講解
這篇文章主要介紹了Java之Swagger配置掃描接口以及開關(guān)案例講解,本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08
基于JavaScript動態(tài)規(guī)劃編寫一個益智小游戲
最近在學(xué)習(xí)動態(tài)規(guī)劃相關(guān)的知識,所以本文將利用動態(tài)規(guī)劃編寫一個簡單的益智小游戲,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下2023-06-06
SpringSecurity實(shí)現(xiàn)踢出指定用戶的示例
SpringSecurity中使用SessionRegistryImpl類可以獲取session信息并踢出用戶,這篇文章主要介紹了SpringSecurity實(shí)現(xiàn)踢出指定用戶的示例,需要的朋友可以參考下2025-03-03
java傳入時(shí)間戳返回LocalDateTime的實(shí)現(xiàn)方法
這篇文章主要介紹了java傳入時(shí)間戳返回LocalDateTime的實(shí)現(xiàn)方法,在Java中將時(shí)間戳轉(zhuǎn)換為LocalDateTime時(shí)需要注意時(shí)區(qū)問題,因?yàn)長ocalDateTime不包含時(shí)區(qū)信息,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-11-11
Mybatis?Plus?中的LambdaQueryWrapper示例詳解
這篇文章主要介紹了Mybatis?Plus?中的LambdaQueryWrapper,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-03-03
Spring?Boot?@Autowired?@Resource屬性賦值時(shí)機(jī)探究
這篇文章主要為大家介紹了Spring?Boot?@Autowired?@Resource屬性賦值時(shí)機(jī),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07

