elasticsearch的zenDiscovery和master選舉機制原理分析
前言
上一篇通過 ElectMasterService源碼,分析了master選舉的原理的大部分內(nèi)容:master候選節(jié)點ID排序保證選舉一致性及通過設置最小可見候選節(jié)點數(shù)目避免brain split。節(jié)點排序后選舉只能保證局部一致性,如果發(fā)生節(jié)點接收到了錯誤的集群狀態(tài)就會選舉出錯誤的master,因此必須有其它措施來保證選舉的一致性。這就是上一篇所提到的第二點:被選舉的數(shù)量達到一定的數(shù)目同時自己也選舉自己,這個節(jié)點才能成為master。這一點體現(xiàn)在zenDiscovery中,本篇將結(jié)合節(jié)點的發(fā)現(xiàn)過程進一步介紹master選舉機制。
節(jié)點啟動后首先啟動join線程,join線程會尋找cluster的master節(jié)點,如果集群之前已經(jīng)啟動,并且運行良好,則試圖連接集群的master節(jié)點,加入集群。否則(集群正在啟動)選舉master節(jié)點,如果自己被選為master,則向集群中其它節(jié)點發(fā)送一個集群狀態(tài)更新的task,如果master是其它節(jié)點則試圖加入該集群。
join的代碼
private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
//一直阻塞直到找到master節(jié)點,在集群剛剛啟動,或者集群master丟失的情況,這種阻塞能夠保證集群一致性
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
masterNode = findMaster();
}
//有可能自己會被選舉為master(集群啟動,或者加入時正在選舉)
if (clusterService.localNode().equals(masterNode)) {
//如果本身是master,則需要向其它所有節(jié)點發(fā)送集群狀態(tài)更新
clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
//選舉時錯誤的,之前的master狀態(tài)良好,則不更新狀態(tài),仍舊使用之前狀態(tài)。
if (currentState.nodes().masterNode() != null) {
return currentState;
}
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().localNode().id());
// update the fact that we are the master...
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build();
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result result = allocationService.reroute(currentState);
return ClusterState.builder(currentState).routingResult(result).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (newState.nodes().localNodeMaster()) {
// we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
joinThreadControl.markThreadAsDone(currentThread);
nodesFD.updateNodesAndPing(newState); // start the nodes FD
} else {
// if we're not a master it means another node published a cluster state while we were pinging
// make sure we go through another pinging round and actively join it
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
sendInitialStateEventIfNeeded();
long count = clusterJoinsCounter.incrementAndGet();
logger.trace("cluster joins counter set to [{}] (elected as master)", count);
}
});
} else {
// 找到的節(jié)點不是我,試圖連接該master
final boolean success = joinElectedMaster(masterNode);
// finalize join through the cluster state update thread
final DiscoveryNode finalMasterNode = masterNode;
clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (!success) {
// failed to join. Try again...
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
return currentState;
}
if (currentState.getNodes().masterNode() == null) {
// Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
// a valid master.
logger.debug("no master node is set, despite of join request completing. retrying pings.");
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
return currentState;
}
if (!currentState.getNodes().masterNode().equals(finalMasterNode)) {
return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
}
// Note: we do not have to start master fault detection here because it's set at {@link #handleNewClusterStateFromMaster }
// when the first cluster state arrives.
joinThreadControl.markThreadAsDone(currentThread);
return currentState;
}
@Override
public void onFailure(String source, @Nullable Throwable t) {
logger.error("unexpected error while trying to finalize cluster join", t);
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
});
}
}以上就是join的過程。zenDiscovery在啟動時會啟動一個join線程,這個線程調(diào)用了該方法。同時在節(jié)點離開,master丟失等情況下也會重啟這一線程仍然運行join方法。
findMaster方法
這個方法體現(xiàn)了master選舉的機制。代碼如下:
private DiscoveryNode findMaster() {
//ping集群中的節(jié)點
ZenPing.PingResponse[] fullPingResponses = pingService.pingAndWait(pingTimeout);
if (fullPingResponses == null) {return null;
}// 過濾所得到的ping響應,慮除client節(jié)點,單純的data節(jié)點
List<ZenPing.PingResponse> pingResponses = Lists.newArrayList();
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
DiscoveryNode node = pingResponse.node();
if (masterElectionFilterClientNodes && (node.clientNode() || (!node.masterNode() && !node.dataNode()))) {
// filter out the client node, which is a client node, or also one that is not data and not master (effectively, client)
} else if (masterElectionFilterDataNodes && (!node.masterNode() && node.dataNode())) {
// filter out data node that is not also master
} else {
pingResponses.add(pingResponse);
}
}
final DiscoveryNode localNode = clusterService.localNode();
List<DiscoveryNode> pingMasters = newArrayList();
//獲取所有ping響應中的master節(jié)點,如果master節(jié)點是節(jié)點本身則過濾掉。pingMasters列表結(jié)果要么為空(本節(jié)點是master)要么是同一個節(jié)點(出現(xiàn)不同節(jié)點則集群出現(xiàn)了問題
不過沒關系,后面會進行選舉)
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.master() != null) {
if (!localNode.equals(pingResponse.master())) {
pingMasters.add(pingResponse.master());
}
}
}
// nodes discovered during pinging
Set<DiscoveryNode> activeNodes = Sets.newHashSet();
// nodes discovered who has previously been part of the cluster and do not ping for the very first time
Set<DiscoveryNode> joinedOnceActiveNodes = Sets.newHashSet();
Version minimumPingVersion = localNode.version();
for (ZenPing.PingResponse pingResponse : pingResponses) {
activeNodes.add(pingResponse.node());
minimumPingVersion = Version.smallest(pingResponse.node().version(), minimumPingVersion);
if (pingResponse.hasJoinedOnce() != null && pingResponse.hasJoinedOnce()) {
joinedOnceActiveNodes.add(pingResponse.node());
}
}//本節(jié)點暫時是master也要加入候選節(jié)點進行選舉
if (localNode.masterNode()) {
activeNodes.add(localNode);
long joinsCounter = clusterJoinsCounter.get();
if (joinsCounter > 0) {
logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
joinedOnceActiveNodes.add(localNode);
}
}
//pingMasters為空,則本節(jié)點是master節(jié)點,
if (pingMasters.isEmpty()) {
if (electMaster.hasEnoughMasterNodes(activeNodes)) {//保證選舉數(shù)量,說明有足夠多的節(jié)點選舉本節(jié)點為master,但是這還不夠,本節(jié)點還需要再選舉一次,如果
本次選舉節(jié)點仍舊是自己,那么本節(jié)點才能成為master。這里就體現(xiàn)了master選舉的第二條原則。
DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes);
if (master != null) {
return master;
}
return electMaster.electMaster(activeNodes);
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.trace("not enough master nodes [{}]", activeNodes);
return null;
}
} else {
//pingMasters不為空(pingMasters列表中應該都是同一個節(jié)點),本節(jié)點沒有被選舉為master,那就接受之前的選舉。
return electMaster.electMaster(pingMasters);
}
}上面的重點部分都做了標注,就不再分析。除了findMaster方法,還有一個方法也體現(xiàn)了master選舉,那就是handleMasterGone。下面是它的部分代碼,提交master丟失task部分,
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE,?new?ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
//獲取到當前集群狀態(tài)下的所有節(jié)點
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(currentState.nodes())
// make sure the old master node, which has failed, is not part of the nodes we publish
.remove(masterNode.id())
.masterNodeId(null).build();
//rejoin過程仍然是重復findMaster過程
if (rejoin) {
return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "master left (reason = " + reason + ")");
}
//無法達到選舉數(shù)量,進行findMaster過程
if (!electMaster.hasEnoughMasterNodes(discoveryNodes)) {
return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "not enough master nodes after master left (reason = " + reason + ")");
}
//在當前集群狀態(tài)下,如果候選節(jié)點數(shù)量達到預期數(shù)量,那么選舉出來的節(jié)點一定是同一個節(jié)點,因為所有的節(jié)點看到的集群states是一致的
final DiscoveryNode electedMaster = electMaster.electMaster(discoveryNodes); // elect master
final DiscoveryNode localNode = currentState.nodes().localNode();
....
}從以上的代碼可以看到master選舉節(jié)點的應用場景,無論是findMaster還是handlemasterGone,他們都保證了選舉一致性。那就是所選節(jié)點數(shù)量必須要達到一定的數(shù)量,否則不能認為選舉成功,進入等待環(huán)境。如果當前節(jié)點被其它節(jié)點選舉為master,仍然要進行選舉一次以保證選舉的一致性。這樣在保證了選舉數(shù)量同時對候選節(jié)點排序從而保證選舉的一致性。
發(fā)現(xiàn)和加入集群是zenDiscovery的主要功能,當然它還有一些其它功能,如處理節(jié)點離開(handleLeaveRequest),處理master發(fā)送的最小clustersates(handleNewClusterStateFromMaster)等功能。這里就不一一介紹,有興趣請參考相關源碼。
總結(jié)
本節(jié)結(jié)合zenDiscovery,分析了master選舉的另外一部分內(nèi)容。同時zenDiscovery是節(jié)點發(fā)現(xiàn)集群功能的集合,它主要功能是發(fā)現(xiàn)(選舉)出集群的master節(jié)點,并試圖加入集群。同時如果 本機是master還會處理節(jié)點的離開和節(jié)點丟失,如果不是master則會處理來自master的節(jié)點狀態(tài)更新。
以上就是elasticsearch的zenDiscovery和master選舉機制原理分析的詳細內(nèi)容,更多關于elasticsearch的zenDiscovery和master選舉機制的資料請關注腳本之家其它相關文章!
相關文章
SpringBoot Actuator未授權(quán)訪問漏洞的排查和解決方法
Spring Boot Actuator 是開發(fā)和管理生產(chǎn)級 Spring Boot 應用程序的重要工具,它可以幫助你確保應用程序的穩(wěn)定性和性能,本文給大家介紹了SpringBoot Actuator未授權(quán)訪問漏洞的排查和解決方法,需要的朋友可以參考下2024-05-05
使用@value注解取不到application.xml配置文件中的值問題
這篇文章主要介紹了使用@value注解取不到application.xml配置文件中的值問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
springboot多個service互相調(diào)用的事務處理方式
這篇文章主要介紹了springboot多個service互相調(diào)用的事務處理方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02

