Flink ExecutionGraph生成源碼解析
引言
前面我們在介紹DataStream時,介紹了Flink任務(wù)提交時從StreamGraph->JobGraph->ExecutionGraph的過程,而如何生成ExecutionGraph并沒介紹,本節(jié)來介紹在具體調(diào)度執(zhí)行時使用的圖結(jié)構(gòu)ExecutionGraph。StreamGraph和JobGraph是在Client生成的。
ExecutionGraph是在JobManager(Flink任務(wù)執(zhí)行時的Master節(jié)點(diǎn))端生成的,JobManager會根據(jù)提交的JobGraph來生成ExecutionGraph。
重要類
DefaultExecutionGraph
ExecutionGraph的實(shí)現(xiàn)類,保存了具體的Graph結(jié)構(gòu)信息、具體執(zhí)行時的作業(yè)和任務(wù)相關(guān)信息以及作業(yè)執(zhí)行中的中間結(jié)果信息等。相關(guān)重要屬性如下
// JobGraph的節(jié)點(diǎn)ID和ExecutionGraph的節(jié)點(diǎn)信息映射
private final Map<JobVertexID, ExecutionJobVertex> tasks;
// 按依賴順序的Execution節(jié)點(diǎn)數(shù)據(jù)
private final List<ExecutionJobVertex> verticesInCreationOrder;
// 執(zhí)行嘗試信息
private final Map<ExecutionAttemptID, Execution> currentExecutions;
//中間結(jié)果數(shù)據(jù)信息
private final Map<IntermediateDataSetID, IntermediateResult> intermediateResults;
// 當(dāng)前作業(yè)狀態(tài)
private volatile JobStatus state = JobStatus.CREATED;
//執(zhí)行拓?fù)浣Y(jié)構(gòu)
private DefaultExecutionTopology executionTopology;
// checkpoint處理協(xié)調(diào)器
@Nullable private CheckpointCoordinator checkpointCoordinator;
ExecutionJobVertex
在ExecutionGraph中的節(jié)點(diǎn)信息,與JobGraph的JobVertex是一一對應(yīng)的。其中存儲了
// 每個子任務(wù)節(jié)點(diǎn)信息
@Nullable private ExecutionVertex[] taskVertices;
// 產(chǎn)出數(shù)據(jù)集
@Nullable private IntermediateResult[] producedDataSets;
// 輸入數(shù)據(jù)集
@Nullable private List<IntermediateResult> inputs;
// 并行度
private final VertexParallelismInformation parallelismInfo;
ExecutionVertex
ExecutionJobVertex中根據(jù)并行度生成的單個子任務(wù),包括具體的子任務(wù)的編號,執(zhí)行信息等
IntermediateResult
節(jié)點(diǎn)的每個輸出鏈對應(yīng)一個IntermediateResult,每個IntermediateResult下按ExecutionJobVertex的并行度對應(yīng)有相應(yīng)的IntermediateResultPartition。
SlotSharingGroup
定義不同節(jié)點(diǎn)的任務(wù)可以部署到同一個slot中,對slot進(jìn)行共享,更為有效的使用slot資源。
ExecutionGraph生成
ExecutionGraph是Scheduler(JobManager中的負(fù)責(zé)調(diào)度處理的類)中實(shí)例化時通過調(diào)用createAndRestoreExecutionGraph方法來生成ExecutionGraph的。其最終調(diào)用的是DefaultExecutionGraphBuilder類中的buildGraph()方法。其具體流程如下:
- 創(chuàng)建一個DefaultExecutionGraph的實(shí)例,這里主要是傳入一些參數(shù)處理,并沒有關(guān)聯(lián)JobGraph的信息;
- 初始化JobVertex,處理inputoutput的格式信息;
- 將JobGraph的所有JobVertex進(jìn)行按依賴順序進(jìn)行排序處理;
- 調(diào)用ExecutionGraph的attachJobGraph方法將JobVertex列表信息綁定到ExecutionGraph
每一個ExecutionJobVertex對應(yīng)一個JobVertex,每一個IntermediateResult對應(yīng)到一個JobVertex的IntermediateDataSet,再根據(jù)JobVertex的并行度生成對應(yīng)數(shù)量的ExecutionVertex,用數(shù)組存儲
根據(jù)JobVertex的inputs信息初始化ExecutionJobVertex的inputs信息。 - 配置statebackend和checkpoint信息,此部分留到介紹checkpoint時再詳細(xì)介紹
總結(jié)
本篇接著01-DataStream基礎(chǔ)介紹了JobGraph到ExecutionGraph的轉(zhuǎn)換過程。首先介紹了ExecutionGraph中的相關(guān)核心概念,如ExecutionJobVertex、IntermediateResult等。后面介紹了ExecutionGraph的詳細(xì)生成過程。
在ExecutionGraph生成的最后會設(shè)置checkpoint等信息,此塊后面單獨(dú)介紹。ExecutionGraph生成好后,會通過DefaultScheduler的startScheduling()方法來觸發(fā)進(jìn)行調(diào)度(具體調(diào)度及運(yùn)行后面介紹),更多關(guān)于Flink ExecutionGraph生成的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot整合JWT框架,解決Token跨域驗(yàn)證問題
Json web token (JWT), 是為了在網(wǎng)絡(luò)應(yīng)用環(huán)境間傳遞聲明而執(zhí)行的一種基于JSON的開放標(biāo)準(zhǔn)((RFC 7519).定義了一種簡潔的,自包含的方法用于通信雙方之間以JSON對象的形式安全的傳遞信息。2021-06-06
一文了解MyBatis Plus批量數(shù)據(jù)插入功能
mybatisPlus底層的新增方法是一條一條的新增的,下面這篇文章主要給大家介紹了MyBatis Plus批量數(shù)據(jù)插入功能的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2021-09-09
Java事務(wù)管理學(xué)習(xí)之JDBC詳解
這篇文章主要介紹了Java事務(wù)管理學(xué)習(xí)之JDBC的相關(guān)資料,文中介紹的非常詳細(xì),相信對大家具有一定的參考價值,需要的朋友們下面來一起看看吧。2017-03-03
Spring Boot部署到Tomcat過程中遇到的問題匯總
這篇文章主要給大家分享了關(guān)于Spring Boot部署到Tomcat過程中遇到的一些問題,文中將解決的方法介紹非常詳細(xì),對同樣遇到這個問題的朋友具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2018-03-03

