Flink JobGraph生成源碼解析
引言
在DataStream基礎(chǔ)中,由于其中的內(nèi)容較多,只是介紹了JobGraph的結(jié)果,而沒有涉及到StreamGraph到JobGraph的轉(zhuǎn)換過程。本篇我們來介紹下JobGraph的生成的詳情,重點是Operator可以串聯(lián)成Chain的條件
概念
首先我們來回顧下JobGraph中的相關(guān)概念
- JobVertex:job的頂點,即對應(yīng)的計算邏輯(這里用的是Vertex, 而前面用的是Node,有點差異),通過inputs記錄了所有來源的Edge,而輸出是ArrayList來記錄
- JobEdge: job的邊,記錄了源Vertex和目標表Vertex.
- IntermediateDataSet: 定義了一個中間數(shù)據(jù)集,但并沒有存儲,只是記錄了一個Producer(JobVertex)和一個Consumer(JobEdge)
JobGraph生成
前面我們在介紹部署的時候,有介紹具體是通過PipelineExecutor的execute()方法來提交對應(yīng)的任務(wù),StreamGraph到JobGraph的轉(zhuǎn)換邏輯就是在該方法中處理的,具體是通過如下方法來進行處理
public static JobGraph getJobGraph(
@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration)
最后執(zhí)行轉(zhuǎn)換的類為FlinkPipelineTranslator,調(diào)用的是其中的translateToJobGraph方法。
JobGraph translateToJobGraph(
Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism);

這里有2個不同的實現(xiàn)類
- StreamGraphTranslator:對StreamGraph的Pipeline進行轉(zhuǎn)換處理
- PlanTranslator:對Plan類型的Pipeline進行轉(zhuǎn)換處理,用于SQL場景。 而這2個分別對應(yīng)到2個不同的類來生成JobGraph,分別如下:
- StreamingJobGraphGenerator
- JobGraphGenerator 本篇我們重點介紹StreamGraph到JobGraph的轉(zhuǎn)換StreamingJobGraphGenerator, JogGraphGenerator這塊等到介紹FlinkSQL的時候來介紹。StreamingJobGraphGenerator類中具體轉(zhuǎn)換處理的邏輯如下:
private JobGraph createJobGraph() {
preValidate();
jobGraph.setJobType(streamGraph.getJobType());

jobGraph.enableApproximateLocalRecovery(
streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes =
defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
setChaining(hashes, legacyHashes);
setPhysicalEdges();
setSlotSharingAndCoLocation();
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
configureCheckpointing();
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
JobGraphUtils.prepareUserArtifactEntries(
streamGraph.getUserArtifacts().stream()
.collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
jobGraph.getJobID());
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
distributedCacheEntries.entrySet()) {
jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
}
// set the ExecutionConfig last when it has been finalized
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
} catch (IOException e) {
throw new IllegalConfigurationException(
"Could not serialize the ExecutionConfig."
+ "This indicates that non-serializable types (like custom serializers) were registered");
}
addVertexIndexPrefixInVertexName();
setVertexDescription();
return jobGraph;
}
重點我們介紹以下幾點
生成hash值
對每個streamNode生成一個hash值,用于來標識節(jié)點,用于重新提交任務(wù)后涉及恢復(fù)作業(yè)的場景。具體生成hash值的邏輯如下:
- 如果指定了id信息,如Transformation.getUid(), 就用該值來生成hash值
- 否則使用鏈上的輸出node和節(jié)點的輸入nodes的hash值來生成一個hash值 對具體的算法細節(jié)感興趣的同學(xué)可以深入研究StreamGraphHasherV2的具體內(nèi)容。
生成chain
如果連接的2個節(jié)點滿足一定的條件,就會把這2個節(jié)點放到一個chain里面,這樣可以避免上下游算子間發(fā)送數(shù)據(jù)的網(wǎng)絡(luò)開銷和序列化反序列化的性能開銷。判斷算子是否可以組成一個chain的判斷邏輯如下:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
}
private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
&& arePartitionerAndExchangeModeChainable(
edge.getPartitioner(),
edge.getExchangeMode(),
streamGraph.getExecutionConfig().isDynamicGraph())
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled())) {
return false;
}
// check that we do not have a union operation, because unions currently only work
// through the network/byte-channel stack.
// we check that by testing that each "type" (which means input position) is used only once
for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
return false;
}
}
return true;
}
具體解讀如下:
- 下游節(jié)點只有1個輸入邊
- 上游節(jié)點和下游節(jié)點是在同一個SlotSharingGroup,slotSharingGroup在沒有設(shè)置的情況下,默認為default;
- 上下游節(jié)點的算子的chaining策略是支持chain的,上游算子的chaining策略為ALWAYS\HEAD\HEAD_WITH_SOURCES,下游算子的chaining策略為ALWAYS或者(HEAD_WITH_SOURCES且上游算子為source算子,具體這些策略的說明見ChainingStrategy.java
- 邊的分區(qū)策略是ForwardForConsecutiveHashPartitioner或者分區(qū)策略是ForwardPartitioner且數(shù)據(jù)交換方式(StreamExchangeMode)不是批模式
- 上下游節(jié)點的并行度一致
- StreamGraph是允許Chaining的
總結(jié)
本篇介紹了StreamGraph到JobGraph的生成流程,重點是在上下游節(jié)點是需要滿足什么條件才能chain到一起的具體邏輯。
以上就是Flink JobGraph生成源碼解析的詳細內(nèi)容,更多關(guān)于Flink JobGraph生成的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
為什么wait和notify必須放在synchronized中使用
這篇文章主要介紹了為什么wait和notify必須放在synchronized中使用,文章圍繞主題的相關(guān)問題展開詳細介紹,具有一定的參考價值,需要的小伙伴可以參考以參考一下2022-05-05
解決springboot自定義注解AOP在controller上導(dǎo)致controller注入失敗問題
這篇文章主要介紹了解決springboot自定義注解AOP在controller上導(dǎo)致controller注入失敗問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-10-10
Java中Velocity快速對變量中的引號特殊字符進行轉(zhuǎn)義
Velocity是一個基于Java的模板引擎,與Freemarker類似,這篇文章主要介紹了Java中Velocity如何對變量中的引號特殊字符進行轉(zhuǎn)義,主要記錄一下在使用中碰到的要對引號特殊字符進行轉(zhuǎn)義的問題,需要的朋友可以參考下2023-07-07
JAVA開發(fā)中的一些規(guī)范講解(阿里巴巴Java開發(fā)規(guī)范手冊)
這篇文章主要介紹了JAVA開發(fā)中的一些規(guī)范講解(阿里巴巴Java開發(fā)規(guī)范手冊),需要的朋友可以參考下2018-04-04
解決IDEA中Maven依賴包導(dǎo)入失敗報紅問題(總結(jié)最有效8種解決方案)
這篇文章主要介紹了解決IDEA中Maven依賴包導(dǎo)入失敗報紅問題,本文通過圖文詳解給大家總結(jié)了最有效的8種解決方法,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07

