elasticsearch集群發(fā)現(xiàn)zendiscovery的Ping機制分析
zenDiscovery實現(xiàn)機制
ping是集群發(fā)現(xiàn)的基本手段,通過在網(wǎng)絡上廣播或者指定ping某些節(jié)點獲取集群信息,從而可以找到集群的master加入集群。zenDiscovery實現(xiàn)了兩種ping機制:廣播與單播。本篇將詳細分析一些這MulticastZenPing機制的實現(xiàn)為后面的集群發(fā)現(xiàn)和master選舉做好鋪墊。
廣播的過程
首先看一下廣播(MulticastZenPing),廣播的原理很簡單,節(jié)點啟動后向網(wǎng)絡發(fā)送廣播信息,任何收到的節(jié)點只要集群名字相同都應該對此廣播信息作出回應。這樣該節(jié)點就獲取了集群的相關信息。它定義了一個action:"internal:discovery/zen/multicast"和廣播的信息頭:INTERNAL_HEADER 。之前說過NettyTransport是cluster通信的基礎,但是廣播卻沒有使它。它使用了java的MulticastSocket。這里簡單的介紹一下MulticastSocket的使用。它是一個UDP 機制的socket,用來進行多個數(shù)據(jù)包的廣播。它可以幫到一個ip形成一個group,任何MulticastSocket都可以join進來,組內的socket發(fā)送的信息會被訂閱了改組的所有機器接收到。elasticsearch對其進行了封裝形成了MulticastChannel,有興趣可以參考相關源碼。
首先看一下MulticastZenPing的幾個輔助內部類:

它總共定義了4個內部類,這些內部類和它一起完成廣播功能。FinalizingPingCollection是一pingresponse的容器,所有的響應都用它來存儲。MulticastPingResponseRequestHandler它是response處理類,類似于之前所說的nettytransportHandler,它雖然使用的不是netty,但是它也定義了一個messageReceived的方法,當收到請求時直接返回一個response。
MulticastPingResponse就不用細說了,它就是一個響應類。最后要著重說一下Receiver類,因為廣播并不是使用NettyTransport,因此對于消息處理邏輯都在Receiver中。在初始化MulticastZenPing時會將receiver注冊進去。
protected void doStart() throws ElasticsearchException {
try {
....
multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
new Receiver());//將receiver注冊到channel中
} catch (Throwable t) {
....
}
}Receiver類基礎了Listener,實現(xiàn)了3個方法,消息經(jīng)過onMessage方法區(qū)分,如果是內部ping則使用handleNodePingRequest方法處理,否則使用handleExternalPingRequest處理,區(qū)分方法很簡單,就是讀取信息都看它是否符合所定義的INTERNAL_HEADER 信息頭。
nodeping處理代碼
private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
....
final DiscoveryNodes discoveryNodes = contextProvider.nodes();
final DiscoveryNode requestingNode = requestingNodeX;
if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
// 自身發(fā)出的ping,忽略
return;
}
//只接受本集群ping
if (!requestClusterName.equals(clusterName)) {
...return;
}
// 兩個client間不需要ping
if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
}
//新建一個response
final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
multicastPingResponse.id = id;
multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
//無法連接的情況
if (!transportService.nodeConnected(requestingNode)) {
// do the connect and send on a thread pool
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
// connect to the node if possible
try {
transportService.connectToNode(requestingNode);
transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
} catch (Exception e) {
if (lifecycle.started()) {
logger.warn("failed to connect to requesting node {}", e, requestingNode);
}
}
}
});
} else {
transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
if (lifecycle.started()) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
}
});
}
}
}另外的一個方法是處理外部ping信息,處理過程是返回cluster的信息(這種外部ping的具體作用沒有研究不是太清楚)。以上是響應MulticastZenPing的過程,收到其它節(jié)點的響應信息后它會把本節(jié)點及集群的master節(jié)點相關信息返回給廣播節(jié)點。這樣廣播節(jié)點就獲知了集群的相關信息。在MulticastZenPing類中還有一個類 MulticastPingResponseRequestHandler,它的作用是廣播節(jié)點對其它節(jié)點對廣播信息響應的回應,廣播節(jié)點的第二次發(fā)送信息的過程。它跟其它TransportRequestHandler一樣它有messageReceived方法,在啟動時注冊到transportserver中,只處理一類action:"internal:discovery/zen/multicast"。
ping請求的發(fā)送策略
代碼如下:
public void ping(final PingListener listener, final TimeValue timeout) {
....
//產(chǎn)生一個id
final int id = pingIdGenerator.incrementAndGet();
try {
receivedResponses.put(id, new PingCollection());
sendPingRequest(id);//第一次發(fā)送ping請求
// 等待時間的1/2后再次發(fā)送一個請求
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
sendPingRequest(id);
//再過1/2時間再次發(fā)送一個請求
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
//最后一次發(fā)送請求,超時的1/4后
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}
@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
});
}
});
} catch (Exception e) {
logger.warn("failed to ping", e);
finalizePingCycle(id, listener);
}
}發(fā)送過程主要是調用sendPingRequest(id)方法,在該方法中會將id,信息頭,版本,本地節(jié)點信息一起寫入到BytesStreamOutput中然后將其進行廣播,這個廣播信息會被其它機器上的Receiver接收并處理,并且響應該ping請求。另外一個需要關注的是以上加說明的部分,它通過鏈時的定期發(fā)送請求,在等待時間內可能會發(fā)出4次請求,這種發(fā)送方式會造成大量的ping請求重復,幸好ping的資源消耗小,但是好處是可以盡可能保證在timeout這個時間段內集群的新增節(jié)點都能收到這個ping信息。在單播中也采用了該策略。
總結
廣播的過程:廣播使用的是jdk的MulticastSocket,在timeout時間內4次發(fā)生ping請求,ping請求包括一個id,信息頭,本地節(jié)點的一些信息;這些信息在其它節(jié)點中被接收到交給Receiver處理,Receiver會將集群的master和本機的相關信息通過transport返回給廣播節(jié)點。廣播節(jié)點收到這些信息后會理解使用transport返回一個空的response。至此一個廣播過程完成。
在節(jié)點分布在多個網(wǎng)段時,廣播就失效了,因為廣播信息不可達。這個時間就需要使用單播去ping指定的節(jié)點獲取cluster的相關信息。這就是單播的用處。單播使用的是NettyTransport,它會使用跟廣播一樣的鏈式請求向指定的節(jié)點發(fā)送請求。信息的處理方式是之前所介紹的NettyTransport標準的信息處理過程。
以上就是elasticsearch集群發(fā)現(xiàn)zendiscovery的Ping機制分析的詳細內容,更多關于elasticsearch集群發(fā)現(xiàn)zendiscovery Ping的資料請關注腳本之家其它相關文章!
相關文章
Spring Boot產(chǎn)生環(huán)形注入的解決方案
這篇文章主要介紹了Spring Boot產(chǎn)生環(huán)形注入的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
springboot中request和response的加解密實現(xiàn)代碼
這篇文章主要介紹了springboot中request和response的加解密實現(xiàn),在springboot中提供了RequestBodyAdviceAdapter和ResponseBodyAdvice,利用這兩個工具可以非常方便的對請求和響應進行預處理,需要的朋友可以參考下2022-06-06

