DolphinScheduler容錯(cuò)源碼分析之Worker
引言
上一篇文章介紹了DolphinScheduler中Master的容錯(cuò)機(jī)制,作為去中心化的多Master和多Worker服務(wù)對(duì)等架構(gòu),Worker的容錯(cuò)機(jī)制也是我們需要關(guān)注的。
和Master一樣源碼的版本基于3.1.3
Worker容錯(cuò)源碼分析
worker啟動(dòng)注冊(cè)
首先Worker的啟動(dòng)入口是在WorkerServer中,在Worker啟動(dòng)后就會(huì)執(zhí)行其run方法
@PostConstruct
public void run() {
this.workerRpcServer.start();
this.workerRpcClient.start();
this.taskPluginManager.loadPlugin();
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.start();
this.workerManagerThread.start();
this.messageRetryRunner.start();
/*
* registry hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (!ServerLifeCycleManager.isStopped()) {
close("WorkerServer shutdown hook");
}
}));
}
這里我們只關(guān)心this.workerRegistryClient.start();方法所做的事情:注冊(cè)當(dāng)前worker信息到Zookeeper,并且啟動(dòng)了一個(gè)心跳任務(wù)定時(shí)更新worker的信息到Zookeeper。
/**
* registry
*/
private void registry() {
WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat();
String workerZKPath = workerConfig.getWorkerRegistryPath();
// remove before persist
registryClient.remove(workerZKPath);
registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), NodeType.WORKER)) {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
// sleep 1s, waiting master failover remove
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
workerHeartBeatTask.start();
log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress());
}
這里和master的注冊(cè)流程基本一致,來看看worker注冊(cè)的目錄:
worker注冊(cè)到zk的路徑如下,并且和master都有相同的父級(jí)目錄名稱是/node:
// /nodes/worker/+ip:listenPortworkerConfig.setWorkerRegistryPath(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerConfig.getWorkerAddress());
注冊(cè)的內(nèi)容就是當(dāng)前worker節(jié)點(diǎn)的健康狀況,包含了cpu,內(nèi)存,負(fù)載,磁盤等信息,通過這些信息就可以標(biāo)識(shí)當(dāng)前worker是否健康,可以接收任務(wù)的分配并且去執(zhí)行。
@Override
public WorkerHeartBeat getHeartBeat() {
double loadAverage = OSUtils.loadAverage();
double cpuUsage = OSUtils.cpuUsage();
int maxCpuLoadAvg = workerConfig.getMaxCpuLoadAvg();
double reservedMemory = workerConfig.getReservedMemory();
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
int execThreads = workerConfig.getExecThreads();
int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory,
execThreads, workerWaitingTaskCount);
return WorkerHeartBeat.builder()
.startupTime(ServerLifeCycleManager.getServerStartupTime())
.reportTime(System.currentTimeMillis())
.cpuUsage(cpuUsage)
.loadAverage(loadAverage)
.availablePhysicalMemorySize(availablePhysicalMemorySize)
.maxCpuloadAvg(maxCpuLoadAvg)
.memoryUsage(OSUtils.memoryUsage())
.reservedMemory(reservedMemory)
.diskAvailable(OSUtils.diskAvailable())
.processId(processId)
.workerHostWeight(workerConfig.getHostWeight())
.workerWaitingTaskCount(this.workerWaitingTaskCount.get())
.workerExecThreadCount(workerConfig.getExecThreads())
.serverStatus(serverStatus)
.build();
}
Master監(jiān)聽worker在zk節(jié)點(diǎn)的狀態(tài)
接下來,master就會(huì)對(duì)注冊(cè)的worker節(jié)點(diǎn)進(jìn)行監(jiān)控,在上一篇的介紹中,master啟動(dòng)注冊(cè)后對(duì)node節(jié)點(diǎn)已經(jīng)進(jìn)行了監(jiān)聽,大家可以進(jìn)行回顧一下,這里監(jiān)聽了/node/節(jié)點(diǎn),當(dāng)其下面的子路徑/master或者/worker有變動(dòng)就會(huì)觸發(fā)回調(diào) :
//node registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
因此當(dāng)worker臨時(shí)節(jié)點(diǎn)異常后,master就會(huì)感知到其變化。最終會(huì)回調(diào)MasterRegistryDataListener中的notify方法,并根據(jù)變動(dòng)的路徑來判斷是master還是worker:
@Override
public void notify(Event event) {
final String path = event.path();
if (Strings.isNullOrEmpty(path)) {
return;
}
//monitor master
if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) {
handleMasterEvent(event);
} else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
//monitor worker
handleWorkerEvent(event);
}
}
這段代碼在之前master的容錯(cuò)中也見到過。這里是對(duì)于worker的容錯(cuò),就會(huì)觸發(fā)handleWorkerEvent方法。
private void handleWorkerEvent(Event event) {
final String path = event.path();
switch (event.type()) {
case ADD:
logger.info("worker node added : {}", path);
break;
case REMOVE:
logger.info("worker node deleted : {}", path);
masterRegistryClient.removeWorkerNodePath(path, NodeType.WORKER, true);
break;
default:
break;
}
}
接下來就是獲取到下線worker節(jié)點(diǎn)的host信息進(jìn)行進(jìn)一步的容錯(cuò)處理了:
public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) {
logger.info("{} node deleted : {}", nodeType, path);
try {
//獲取節(jié)點(diǎn)信息
String serverHost = null;
if (!StringUtils.isEmpty(path)) {
serverHost = registryClient.getHostByEventDataPath(path);
if (StringUtils.isEmpty(serverHost)) {
logger.error("server down error: unknown path: {}", path);
return;
}
if (!registryClient.exists(path)) {
logger.info("path: {} not exists", path);
}
}
// failover server
if (failover) {
failoverService.failoverServerWhenDown(serverHost, nodeType);
}
} catch (Exception e) {
logger.error("{} server failover failed", nodeType, e);
}
}
整個(gè)worker容錯(cuò)的大致過程如下:
1-獲取需要容錯(cuò)worker節(jié)點(diǎn)的啟動(dòng)時(shí)間,用于后續(xù)判斷worker節(jié)點(diǎn)是否還在下線狀態(tài),或者是否已經(jīng)重新啟動(dòng)
2-根據(jù)異常的worker的信息查詢需要容錯(cuò)的任務(wù)實(shí)例,獲取只屬于當(dāng)前master節(jié)點(diǎn)需要容錯(cuò)的任務(wù)實(shí)例信息,這里也是和master不同的,并且容錯(cuò)沒加鎖的原因。
3-遍歷所有要容錯(cuò)的任務(wù)實(shí)例進(jìn)行容錯(cuò) 這里注意的是需要容錯(cuò)的任務(wù)是在worker重新啟動(dòng)之前的任務(wù),之后worker異常重啟后分配的新任務(wù)不要容錯(cuò)
/**
* Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
* and failover these tasks.
* <p>
* Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
*
* @param workerHost worker host
*/
public void failoverWorker(@NonNull String workerHost) {
LOGGER.info("Worker[{}] failover starting", workerHost);
final StopWatch failoverTimeCost = StopWatch.createStarted();
//獲取需要容錯(cuò)worker節(jié)點(diǎn)的啟動(dòng)時(shí)間,用于后續(xù)判斷worker節(jié)點(diǎn)是否還在下線狀態(tài),或者是否已經(jīng)重新啟動(dòng)
// we query the task instance from cache, so that we can directly update the cache
final Optional<Date> needFailoverWorkerStartTime =
getServerStartupTime(registryClient.getServerList(NodeType.WORKER), workerHost);
//根據(jù)異常的worker的信息查詢需要容錯(cuò)的任務(wù)實(shí)例,獲取只屬于當(dāng)前master節(jié)點(diǎn)需要容錯(cuò)的任務(wù)實(shí)例信息,這里也是和master不同的,并且容錯(cuò)沒加鎖的原因。
final List<TaskInstance> needFailoverTaskInstanceList = getNeedFailoverTaskInstance(workerHost);
if (CollectionUtils.isEmpty(needFailoverTaskInstanceList)) {
LOGGER.info("Worker[{}] failover finished there are no taskInstance need to failover", workerHost);
return;
}
LOGGER.info(
"Worker[{}] failover there are {} taskInstance may need to failover, will do a deep check, taskInstanceIds: {}",
workerHost,
needFailoverTaskInstanceList.size(),
needFailoverTaskInstanceList.stream().map(TaskInstance::getId).collect(Collectors.toList()));
final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
try {
ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent(
taskInstance.getProcessInstanceId(), k -> {
WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId(
taskInstance.getProcessInstanceId());
if (workflowExecuteRunnable == null) {
return null;
}
return workflowExecuteRunnable.getProcessInstance();
});
//這里注意的是需要容錯(cuò)的任務(wù)是在worker重新啟動(dòng)之前的任務(wù),之后worker異常重啟后分配的新任務(wù)不要容錯(cuò)
if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) {
LOGGER.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost);
continue;
}
LOGGER.info(
"Worker[{}] failover: begin to failover taskInstance, will set the status to NEED_FAULT_TOLERANCE",
workerHost);
failoverTaskInstance(processInstance, taskInstance);
LOGGER.info("Worker[{}] failover: Finish failover taskInstance", workerHost);
} catch (Exception ex) {
LOGGER.info("Worker[{}] failover taskInstance occur exception", workerHost, ex);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
failoverTimeCost.stop();
LOGGER.info("Worker[{}] failover finished, useTime:{}ms",
workerHost,
failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
}
4-更新taskInstance的狀態(tài)為TaskExecutionStatus.NEED_FAULT_TOLERANCE。并且構(gòu)造TaskStateEvent事件,設(shè)置其狀態(tài)為需要容TaskExecutionStatus.NEED_FAULT_TOLERANCE的,其類型是TASK_STATE_CHANGE。最后提交需要容錯(cuò)的event。
private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskMetrics.incTaskInstanceByState("failover");
boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
taskInstance.setProcessInstance(processInstance);
if (!isMasterTask) {
LOGGER.info("The failover taskInstance is not master task");
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
LOGGER.info("TaskInstance failover begin kill the task related yarn job");
ProcessUtils.killYarnJob(logClient, taskExecutionContext);
}
} else {
LOGGER.info("The failover taskInstance is a master task");
}
taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
taskInstance.setFlag(Flag.NO);
processService.saveTaskInstance(taskInstance);
//提交event
TaskStateEvent stateEvent = TaskStateEvent.builder()
.processInstanceId(processInstance.getId())
.taskInstanceId(taskInstance.getId())
.status(TaskExecutionStatus.NEED_FAULT_TOLERANCE)
.type(StateEventType.TASK_STATE_CHANGE)
.build();
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
event的提交會(huì)去根據(jù)其所屬的工作流實(shí)例來選擇其對(duì)應(yīng)的WorkflowExecuteRunnable進(jìn)行提交容錯(cuò):
public void submitStateEvent(StateEvent stateEvent) {
WorkflowExecuteRunnable workflowExecuteThread =
processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
if (workflowExecuteThread == null) {
logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}",
stateEvent);
return;
}
workflowExecuteThread.addStateEvent(stateEvent);
logger.info("Submit state event success, stateEvent: {}", stateEvent);
}
處理容錯(cuò)event事件
在上面的代碼中已經(jīng)對(duì)需要容錯(cuò)的任務(wù)提交了一個(gè)event事件,那么肯定會(huì)有線程對(duì)這個(gè)event進(jìn)行具體的處理。我們來看WorkflowExecuteRunnable類,submitStateEvent就是將event提交到了這個(gè)類中的stateEvents隊(duì)列中:
private final ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
WorkflowExecuteRunnable在master啟動(dòng)的時(shí)候就已經(jīng)啟動(dòng)了,并且會(huì)不停的從stateEvents中獲取event進(jìn)行處理:
/**
* handle event
*/
public void handleEvents() {
if (!isStart()) {
logger.info(
"The workflow instance is not started, will not handle its state event, current state event size: {}",
stateEvents);
return;
}
StateEvent stateEvent = null;
while (!this.stateEvents.isEmpty()) {
try {
stateEvent = this.stateEvents.peek();
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
stateEvent.getTaskInstanceId());
// if state handle success then will remove this state, otherwise will retry this state next time.
// The state should always handle success except database error.
checkProcessInstance(stateEvent);
StateEventHandler stateEventHandler =
StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
.orElseThrow(() -> new StateEventHandleError(
"Cannot find handler for the given state event"));
logger.info("Begin to handle state event, {}", stateEvent);
if (stateEventHandler.handleStateEvent(this, stateEvent)) {
this.stateEvents.remove(stateEvent);
}
} catch (StateEventHandleError stateEventHandleError) {
logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
this.stateEvents.remove(stateEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (StateEventHandleException stateEventHandleException) {
logger.error("State event handle error, will retry this event: {}",
stateEvent,
stateEventHandleException);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
// we catch the exception here, since if the state event handle failed, the state event will still keep
// in the stateEvents queue.
logger.error("State event handle error, get a unknown exception, will retry this event: {}",
stateEvent,
e);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
根據(jù)提交事件的類型StateEventType.TASK_STATE_CHANGE 可以獲取到具體的StateEventHandler實(shí)現(xiàn)是TaskStateEventHandler。在TaskStateEventHandler的handleStateEvent方法中主要對(duì)需要容錯(cuò)的任務(wù)做了如下處理:
if (task.getState().isFinished()) {
if (completeTaskMap.containsKey(task.getTaskCode())
&& completeTaskMap.get(task.getTaskCode()) == task.getId()) {
logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
return true;
}
workflowExecuteRunnable.taskFinished(task);
if (task.getTaskGroupId() > 0) {
logger.info("The task instance need to release task Group: {}", task.getTaskGroupId());
workflowExecuteRunnable.releaseTaskGroup(task);
}
return true;
}
其中判斷是否完成的具體實(shí)現(xiàn)中就包含了是否是容錯(cuò)的狀態(tài)。
public boolean isFinished() {
return isSuccess() || isKill() || isFailure() || isPause();
}
public boolean isFailure() {
return this == TaskExecutionStatus.FAILURE || this == NEED_FAULT_TOLERANCE;
}
接著就會(huì)調(diào)用workflowExecuteRunnable.taskFinished(task);方法去處理各種任務(wù)實(shí)例狀態(tài)變化后的事件。這里我們只關(guān)注容錯(cuò)相關(guān)的代碼分支:
} else if (taskInstance.taskCanRetry() && !processInstance.getState().isReadyStop()) {
// retry task
logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
retryTaskInstance(taskInstance);
}
//判斷了是否容錯(cuò)的狀態(tài),前面對(duì)其已經(jīng)進(jìn)行了更新
public boolean taskCanRetry() {
if (this.isSubProcess()) {
return false;
}
if (this.getState() == TaskExecutionStatus.NEED_FAULT_TOLERANCE) {
return true;
}
return this.getState() == TaskExecutionStatus.FAILURE && (this.getRetryTimes() < this.getMaxRetryTimes());
}
/**
* crate new task instance to retry, different objects from the original
*
*/
private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
if (!taskInstance.taskCanRetry()) {
return;
}
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
if (newTaskInstance == null) {
logger.error("Retry task fail because new taskInstance is null, task code:{}, task id:{}",
taskInstance.getTaskCode(),
taskInstance.getId());
return;
}
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
if (!taskInstance.retryTaskIntervalOverTime()) {
logger.info(
"Failure task will be submitted, process id: {}, task instance code: {}, state: {}, retry times: {} / {}, interval: {}",
processInstance.getId(), newTaskInstance.getTaskCode(),
newTaskInstance.getState(), newTaskInstance.getRetryTimes(), newTaskInstance.getMaxRetryTimes(),
newTaskInstance.getRetryInterval());
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance);
stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance);
} else {
addTaskToStandByList(newTaskInstance);
submitStandByTask();
waitToRetryTaskInstanceMap.remove(newTaskInstance.getTaskCode());
}
}
最終將需要容錯(cuò)的任務(wù)實(shí)例重新加入到了readyToSubmitTaskQueue隊(duì)列中,重新進(jìn)行submit:
addTaskToStandByList(newTaskInstance); submitStandByTask();
后面就是和正常任務(wù)一樣處理了通過submitTaskExec方法提交任務(wù)到具體的worker執(zhí)行。
總結(jié)
對(duì)于Worker的容錯(cuò)流程大致如下:
1-Master基于ZK的監(jiān)聽來感知需要容錯(cuò)的Worker節(jié)點(diǎn)信息
2-每個(gè)Master只負(fù)責(zé)容錯(cuò)屬于自己調(diào)度的工作流實(shí)例,在容錯(cuò)前會(huì)比較實(shí)例的開始時(shí)間和服務(wù)節(jié)點(diǎn)的啟動(dòng)時(shí)間,在服務(wù)啟動(dòng)時(shí)間之后的則跳過容錯(cuò);
3-需要容錯(cuò)的任務(wù)實(shí)例會(huì)重新加入到readyToSubmitTaskQueue,并提交運(yùn)行。
到此,對(duì)于Worker的容錯(cuò),就到這里了,更多關(guān)于DolphinScheduler容錯(cuò)Worker的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java中l(wèi)ength,length(),size()詳解及區(qū)別
這篇文章主要介紹了Java中l(wèi)ength,length(),size()詳解及區(qū)別的相關(guān)資料,需要的朋友可以參考下2016-11-11
spring.factories文件的解析源碼API機(jī)制詳解
通過本文深入探討Spring?Boot的背景歷史、業(yè)務(wù)場(chǎng)景、功能點(diǎn)以及底層原理,使讀者對(duì)Spring?Boot有了更深入的了解,結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-11-11
詳解如何使用SpringBoot實(shí)現(xiàn)下載JSON文件
在?Spring?Boot?中實(shí)現(xiàn)文件下載功能,可以通過將?JSON?字符串作為文件內(nèi)容返回給客戶端從而實(shí)現(xiàn)JSON文件下載效果,下面我們就來看看具體操作吧2025-02-02
SpringSecurity 自定義認(rèn)證登錄的項(xiàng)目實(shí)踐
本文主要介紹了SpringSecurity 自定義認(rèn)證登錄的項(xiàng)目實(shí)踐,以手機(jī)驗(yàn)證碼登錄為例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-08-08

