Flink作業(yè)Task運(yùn)行源碼解析
引言
上一篇我們分析了Flink部署集群的過程和作業(yè)提交的方式,本篇我們來分析下,具體作業(yè)是如何被調(diào)度和計(jì)算的。具體分為2個部分來介紹
- 作業(yè)運(yùn)行的整體框架,對相關(guān)的重要角色有深入了解
- 計(jì)算流程,重點(diǎn)是如何調(diào)度具體的operator機(jī)制
概覽
首先我們來了解下整體的框架 JobMaster: 計(jì)算框架的主節(jié)點(diǎn),負(fù)責(zé)運(yùn)行單個JobGraph,包括任務(wù)的調(diào)度,資源申請和TaskManager的管理等。 TaskExecutor: 負(fù)責(zé)多個Task的具體執(zhí)行 Dispatcher接收到submitJob的請求后,會生成一個JobMaster實(shí)例(具體為Dispatcher創(chuàng)建JobManagerRunner,JobManagerRunner創(chuàng)建JobMaster),下面來具體介紹下JobMaster和TaskExecutor的內(nèi)部信息
調(diào)度框架
JobMaster
private final SchedulerNG schedulerNG;
private final ShuffleMaster<?> shuffleMaster;
private final SlotPoolService slotPoolService;
private final LeaderRetrievalService resourceManagerLeaderRetriever;
private final BlobWriter blobWriter;
private final JobMasterPartitionTracker partitionTracker;
private HeartbeatManager<TaskExecutorToJobManagerHeartbeatPayload, AllocatedSlotReport>
taskManagerHeartbeatManager;
private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
JobMaster作為整個任務(wù)調(diào)度計(jì)算的主節(jié)點(diǎn),需要和一些外部角色進(jìn)行交互,具體的如下:
- resourceManagerLeaderRetriever: 負(fù)責(zé)和resourceManager間的通訊
- slotPoolService: 用于管理slotpool的,slot資源管理,負(fù)責(zé)slot的申請、釋放等。
- partitionTracker: 負(fù)責(zé)算子計(jì)算結(jié)果數(shù)據(jù)分區(qū)的跟蹤
- schedulerNG:內(nèi)部的調(diào)度引擎,負(fù)責(zé)job的調(diào)度處理
- shuffleMaster: 數(shù)據(jù)shuffle處理
- taskManagerHeartbeatManager:記錄和taskManager間的心跳信息,
- resourceManagerHeartbeatManager:記錄和resourceManager間的心跳
ScheduleNG
ScheduleNG實(shí)際負(fù)責(zé)job調(diào)度處理,包括生成ExecutionGraph,作業(yè)的調(diào)度執(zhí)行,任務(wù)出錯處理等。其實(shí)現(xiàn)類為DefaultScheduler
- SchedulingStrategy:任務(wù)調(diào)度的策略,實(shí)現(xiàn)類為PipelinedRegionSchedulingStrategy,按pipeline region的粒度來調(diào)度任務(wù)
- ExecutionGraphFactory:其實(shí)現(xiàn)類為DefaultExecutionGraphFactory,創(chuàng)建ExecutionGraph的工廠類
TaskExecutor
實(shí)際任務(wù)運(yùn)行的節(jié)點(diǎn),該類負(fù)責(zé)多個任務(wù)的運(yùn)行,首先我們看看其實(shí)現(xiàn)了TaskExecutorGateway接口,TaskExecutorGateway定義了各類可以調(diào)用的功能接口,具體內(nèi)容見下表
| 分類 | 方法名 | 說明 |
|---|---|---|
| Task操作相關(guān) | SubmitTask | 向TaskExecutor提交任務(wù) |
| Task操作相關(guān) | cancelTask | 取消指定的任務(wù) |
| Task操作相關(guān) | sendOperatorEventToTask | 發(fā)送算子事件給Task |
| Slot操作相關(guān) | requestSlot | 給指定的Job分配指定的slot |
| Slot操作相關(guān) | freeSlot | 釋放對應(yīng)的slot |
| Slot操作相關(guān) | freeInactiveSlots | 釋放指定Job的未使用的slot |
| Partition操作相關(guān) | updatePartitions | 更新分區(qū)信息 |
| Partition操作相關(guān) | releaseOrPromotePartitions | 批量釋放或保留分區(qū) |
| Partition操作相關(guān) | releaseClusterPartitions | 釋放屬于給定datasets的所有集群分區(qū)數(shù)據(jù) |
| checkpoint操作相關(guān) | triggerCheckpoint | 觸發(fā)指定任務(wù)的checkpoint處理 |
| checkpoint操作相關(guān) | confirmCheckpoint | 確認(rèn)指定任務(wù)的checkpoint |
| checkpoint操作相關(guān) | abortCheckpoint | 終止給定任務(wù)的checkpoint |
Task
一個Task負(fù)責(zé)TaskManager上一個subtask的一次執(zhí)行,Task對Flink Operator進(jìn)行包裝然后運(yùn)行,并提供需要的各類服務(wù),如消費(fèi)輸入數(shù)據(jù),生產(chǎn)數(shù)據(jù)以及和JobManager通訊。Task實(shí)現(xiàn)了Runnable接口,即通過一個單獨(dú)的線程來運(yùn)行,而其中的Flink Operator部分封裝在實(shí)現(xiàn)了TaskInvokable接口的類中,實(shí)現(xiàn)類主要為SourceStreamTask和OneInputStreamTask。下面分別詳細(xì)介紹下這幾個類
- Task: 對應(yīng)為一個線程,來運(yùn)行具體的Operator的邏輯,并包括相關(guān)的其他的輔助功能,包括如執(zhí)行狀態(tài)的管理、結(jié)果數(shù)據(jù)管理(ResultPartitionWriters)、輸入數(shù)據(jù)(IndexInputGate)以及生成封裝了Operator邏輯的TaskInvokable實(shí)例并運(yùn)行
- TaskInvokable:封裝了具體Operator的處理邏輯,主要包括有2個方法,restore()和invoke()。restore()方法在invoke()之前調(diào)用,用于恢復(fù)上次的有效狀態(tài)。invoke()方法執(zhí)行具體的處理邏輯。下面我們看看其實(shí)現(xiàn)子類(這里只列了與StreamGraph相關(guān)的實(shí)現(xiàn)類,對于其他的子類沒有展示)

- SourceStreamTask:用于執(zhí)行StreamSource,即源頭的讀取數(shù)據(jù)類Operator
- OneInputStreamTask:用于執(zhí)行OneInputStreamOperator,即只有一個輸入的operator
- TwoInputStreamTask: 用于執(zhí)行TwoInputStreamOperator,有2個輸入的operator
- MultipleInputStreamTask: 用于執(zhí)行MultipleInputStreamOperator,有多個輸入的operator
計(jì)算框架
計(jì)算框架這節(jié)主要來了解數(shù)據(jù)是如何在Flink中如何處理和流轉(zhuǎn)的。這里我們主要回答以下幾個問題:
- Flink中整個數(shù)據(jù)的處理流程,單條數(shù)據(jù)是如何在各個算子間流轉(zhuǎn)和處理的
- 對于算子chain和其他算子其底層實(shí)現(xiàn)區(qū)別是怎樣的,為何chain后的效率會高 我們先以StreamMap算子為例來看整體計(jì)算框架的設(shè)計(jì)
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
這里StreamMap實(shí)現(xiàn)了Input接口,其中在實(shí)現(xiàn)的processElement()方法中實(shí)現(xiàn)了具體的對具體數(shù)據(jù)的操作處理(Operator),并將結(jié)果通過Output接口的collect()方法發(fā)射出去。我們先看看這2個接口定義的方法

基本上2邊是一一對應(yīng)的關(guān)系,Input負(fù)責(zé)處理Element\Watermark\WatermarkStatus\LatencyMarker,而Output負(fù)責(zé)emit這些。這里Input是處理一個輸入的,如果是2個輸入那對應(yīng)的就是TwoInputStreamOperator
算子計(jì)算處理
對于Chain的操作,是通過Output接口的實(shí)現(xiàn)類ChainingOutput.java
// ChainingOutput.java
@Override
public void collect(StreamRecord<T> record) {
pushToOperator(record);
}
protected <X> void pushToOperator(StreamRecord<X> record) {
try {
...
input.setKeyContextElement(castRecord);
input.processElement(castRecord);
} catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
這里可以看到在output.collect()方法中把數(shù)據(jù)再推送到了算子,然后算子(input)繼續(xù)執(zhí)行processElement()這樣來實(shí)現(xiàn)了在當(dāng)前線程內(nèi)的pipeline處理,
總結(jié)
本篇我們介紹了Flink是如何來執(zhí)行相應(yīng)的算子來實(shí)現(xiàn)計(jì)算的,主要介紹了TaskExecutor運(yùn)行的Task實(shí)現(xiàn),以及chain算子是如何串行來運(yùn)行的。對于算子之間的數(shù)據(jù)交互這塊我們后面一篇來單獨(dú)介紹。
以上就是Flink作業(yè)Task運(yùn)行源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Flink作業(yè)Task運(yùn)行的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
基于springmvc之常用注解,操作傳入?yún)?shù)
這篇文章主要介紹了springmvc之常用注解,操作傳入?yún)?shù)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
Spring?web開發(fā)教程之Request獲取3種方式
這篇文章主要給大家介紹了關(guān)于Spring?web開發(fā)教程之Request獲取3種方式的相關(guān)資料,request對象是從客戶端向服務(wù)器發(fā)出請求,包括用戶提交的信息以及客戶端的一些信息,需要的朋友可以參考下2023-11-11
Postman實(shí)現(xiàn)傳List<String>集合
這篇文章主要介紹了Postman實(shí)現(xiàn)傳List<String>集合方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
SpringBoot定制JSON響應(yīng)數(shù)據(jù)返回的示例代碼
@JsonView 是 Jackson 庫中的一個注解,它允許你定義哪些屬性應(yīng)該被序列化到 JSON 中,基于不同的“視圖”或“配置”,在本文中,通過了解@JsonView,你將能夠更好地掌握如何在Spring Boot應(yīng)用中定制JSON數(shù)據(jù)的輸出,需要的朋友可以參考下2024-05-05

