Flink DataStream基礎(chǔ)框架源碼分析
引言
希望通過(guò)對(duì)Flink底層源碼的學(xué)習(xí)來(lái)更深入了解Flink的相關(guān)實(shí)現(xiàn)邏輯。這里新開(kāi)一個(gè)Flink源碼解析的系列來(lái)深入介紹底層源碼邏輯。說(shuō)明:這里默認(rèn)相關(guān)讀者具備Flink相關(guān)基礎(chǔ)知識(shí)和開(kāi)發(fā)經(jīng)驗(yàn),所以不會(huì)過(guò)多介紹相關(guān)的基礎(chǔ)概念相關(guān)內(nèi)容,F(xiàn)link使用的版本為1.15.2。初步確定按如下幾個(gè)大的方面來(lái)介紹
計(jì)算模型
- DataStream基礎(chǔ)框架
部署&調(diào)度
存儲(chǔ)體系
底層支撐
概覽
本篇是第一篇,介紹計(jì)算模型的基礎(chǔ)DataStream的相關(guān)內(nèi)容,這一篇只介紹DataStream的基礎(chǔ)內(nèi)容,如如何實(shí)現(xiàn)相關(guān)的操作,數(shù)據(jù)結(jié)構(gòu)等,不會(huì)涉及到窗口、事件事件和狀態(tài)等信息
DataStream是對(duì)數(shù)據(jù)流的一個(gè)抽象,其提供了豐富的操作算子(例如過(guò)濾、map、join、聚合、定義窗口等)來(lái)對(duì)數(shù)據(jù)流進(jìn)行處理,下圖描述了Flink中源數(shù)據(jù)通過(guò)DataStream的轉(zhuǎn)換最后輸出的整個(gè)過(guò)程。

通過(guò)上圖可以來(lái)構(gòu)想下,一般一個(gè)DataStream具有如下主要屬性

| 屬性 | 說(shuō)明 |
|---|---|
| 上游依賴(lài) | 標(biāo)識(shí)上游依賴(lài)信息,這樣能把整個(gè)處理流程串聯(lián)起來(lái) |
| 并行度 | 處理邏輯的并行度信息,這樣可以提高處理的速度 |
| 輸入格式 | 指定輸入數(shù)據(jù)的格式,如InnerType { public int id; public String text; } |
| 輸出格式 | 指定輸出數(shù)據(jù)的格式 |
| 處理邏輯 | 上游datastream轉(zhuǎn)換到目前的datastream的具體邏輯操作,如map的具體邏輯信息。 |
最終整個(gè)數(shù)據(jù)流會(huì)生成一個(gè)DAG圖(有向無(wú)環(huán)圖),通過(guò)這個(gè)DAG圖就可以生成對(duì)應(yīng)的任務(wù)來(lái)運(yùn)行了。下面來(lái)具體分析DataStream的實(shí)現(xiàn)和生成DAG圖(Flink中叫Graph)
深入DataStream
首先我們通過(guò)下圖來(lái)看看,DataStream中的一些主要的輔助類(lèi),DataStream類(lèi)本身主要邏輯是對(duì)各類(lèi)轉(zhuǎn)換關(guān)系和sink的操作,而前面說(shuō)到的一些主要屬性信息都是通過(guò)輔助類(lèi)來(lái)處理的。

Transformation:本身主要管理了輸出格式、上游依賴(lài)、并行度、id編號(hào)等信息以及StreamOperator的工廠類(lèi)(StreamOperatorFactory)
StreamOperator:主要是各類(lèi)操作的具體處理邏輯
Function:用戶(hù)自定義函數(shù)的接口,如DataStream中map處理時(shí)需要傳入的MapFunction就是Function的子接口
DataStream
屬性和方法

DataStream的屬性比較簡(jiǎn)單,就2個(gè),1個(gè)是實(shí)行的環(huán)境信息,另一個(gè)是Transformation。
DataStream中的方法主要分為以下幾類(lèi)
- 基礎(chǔ)屬性信息:如獲取并行度,id,輸出格式等,大多數(shù)是代理來(lái)調(diào)用Transformation中對(duì)應(yīng)的方法
- 轉(zhuǎn)換操作:各類(lèi)的轉(zhuǎn)換處理,如map、filter、shuffle、join等
- 輸出處理:各類(lèi)輸出的sink處理,如保存為文本等,不過(guò)大多數(shù)方法都不推薦使用了,這里主要的方法是addSink()
- 觸發(fā)執(zhí)行: 如executeAndCollect,內(nèi)部是調(diào)用了env.executeAsync來(lái)執(zhí)行streaming dataflow
除了轉(zhuǎn)換操作外,其他幾類(lèi)的邏輯都比較直觀和簡(jiǎn)單,這里重點(diǎn)介紹下轉(zhuǎn)換操作的處理,轉(zhuǎn)換操作這里分為3類(lèi),1.返回是一個(gè)DataStream。如map、filter、union、keyBy等;2.返回的是一個(gè)Streams,即輸入是多個(gè)DataStream,這類(lèi)的操作主要是多流關(guān)聯(lián)的操作,如join、coGroup。這些Streams的類(lèi)中實(shí)現(xiàn)了一些方法,來(lái)返回一個(gè)DataStream; 3.window類(lèi),返回的是AllWindowedStream類(lèi)型,同樣這些類(lèi)中也是有方法,來(lái)返回一個(gè)DataStream。
說(shuō)明:如上的各個(gè)分類(lèi)都是個(gè)人基于理解上做的各個(gè)分類(lèi)處理,非官方定義
類(lèi)體系
DataStream的類(lèi)圖關(guān)系比較簡(jiǎn)單,就如下這幾個(gè)類(lèi),具體每個(gè)子類(lèi)的信息見(jiàn)下表

| 子類(lèi)名 | 說(shuō)明 |
|---|---|
| SingleOutputStreamOperator | 只有1個(gè)輸出的DataStream |
| IterativeStream | 迭代的DataStream,具體使用場(chǎng)景后面分析 |
| DataStreamSource | 最開(kāi)始的DataStream,里面有source的信息 |
| KeyedStream | 有一個(gè)Key信息的DataStream |
Transformation
屬性和方法
DataStream類(lèi)本身主要是提供了給外部的編程接口的支持,而對(duì)Streaming flow算子節(jié)點(diǎn)本身的一些屬性和操作則由Transformation來(lái)負(fù)責(zé)

從上圖可以看出其主要屬性有節(jié)點(diǎn)id,名稱(chēng),并行度,輸出類(lèi)型還有一些與資源相關(guān)的內(nèi)容,還有一個(gè)是上游的輸入Transformation,由于這個(gè)因不同的Transformation會(huì)有不同的數(shù)據(jù)個(gè)數(shù),所以這個(gè)信息是放在各個(gè)子類(lèi)中的。如ReduceTransformation是有一個(gè)input屬性來(lái)記錄上游依賴(lài),而如TwoInputTransformation則是有2個(gè)屬性input1和input2來(lái)記錄上游依賴(lài),另外如SourceTransformation,這個(gè)是源頭的Transformation,是沒(méi)有上游依賴(lài)的Transformation,所以沒(méi)有屬性來(lái)記錄,但是有個(gè)Source屬性來(lái)記錄Source輸入
具體對(duì)數(shù)據(jù)的操作處理,在Transformation里面有個(gè)StreamOperatorFactory屬性,其中的StreamOperator實(shí)現(xiàn)了各種的處理算子。注意這里不是所有的Transformation都包含StreamOperatorFactory,如SourceTransformation中就沒(méi)有,這個(gè)具體大家可以看看相關(guān)的代碼。
Transformation的方法基本上是對(duì)上述屬性的get和set操作,這里重點(diǎn)要說(shuō)明一下的是PhysicalTransformation(下面類(lèi)體系來(lái)介紹)中的setChainingStrategy方法,這里的ChainingStrategy是一個(gè)枚舉類(lèi),主要是控制多個(gè)連續(xù)的算子是否可以進(jìn)行鏈?zhǔn)教幚?,這個(gè)具體的我們?cè)谙旅娼榻BStreamGraph時(shí)再介紹

類(lèi)體系

Transformation的大多數(shù)類(lèi)均為PhysicalTransformation的子類(lèi),PhysicalTransformation為有物理操作的,重點(diǎn)是這類(lèi)的子類(lèi)是支持Chaining操作的。我們先來(lái)看看其重要子類(lèi)
| 子類(lèi)名 | 說(shuō)明 |
|---|---|
| SourceTransformation | 連接Source的Transformation,是整個(gè)streaming flow的最開(kāi)始的轉(zhuǎn)換處理 |
| SinkTransformation | 輸出的轉(zhuǎn)換處理,是整個(gè)streaming flow的最后一個(gè) |
| OneInputTransformation | 只有1個(gè)輸入的轉(zhuǎn)換處理,如map、filter這類(lèi)的處理 |
| TwoInputTransformation | 有2個(gè)輸入的轉(zhuǎn)換處理 |
StreamOperator
屬性和方法
StreamOperator負(fù)責(zé)對(duì)數(shù)據(jù)進(jìn)行處理的具體邏輯,如map處理的StreamMap,由于各個(gè)Operator的處理方式的不同,這里主要以AbstractStreamOperator來(lái)介紹一些主要的屬性,如output的數(shù)據(jù),StreamConfig,StreamingRuntimeContext等。
下面我們重點(diǎn)介紹下相關(guān)的方法
StreamOperator接口有定義了重要的3個(gè)方法(這里只介紹與數(shù)據(jù)基礎(chǔ)處理相關(guān)的部分)
| 方法 | 說(shuō)明 |
|---|---|
| open() | 數(shù)據(jù)處理的前處理,如算子的初始化操作等 |
| finish() | 數(shù)據(jù)處理的后處理,如緩存數(shù)據(jù)的flush操作等 |
| close() | 該方法在算子生命周期的最后調(diào)用,不管是算子運(yùn)行成功還是失敗或者取消,主要是對(duì)算子使用到的資源的各種釋放處理 |
另外關(guān)注的對(duì)數(shù)據(jù)進(jìn)行實(shí)際處理的方法,
| 接口 | 方法 | 說(shuō)明 |
|---|---|---|
| OneInputStreamOperator | processElement() | 對(duì)數(shù)據(jù)元素進(jìn)行處理,實(shí)際該接口在OneInputStreamOperator的父接口Input中定義 |
| TwoInputStreamOperator | processElement1() | 對(duì)input1的數(shù)據(jù)元素進(jìn)行處理 |
| processElement2() | 對(duì)input2的數(shù)據(jù)元素進(jìn)行處理 |
類(lèi)體系
StreamOpterator的子類(lèi)非常多,包括測(cè)試類(lèi)的一起有287個(gè),這些大致可以歸屬到如下3個(gè)子類(lèi)中,

| 類(lèi)名 | 說(shuō)明 |
|---|---|
| OneInputStreamOperator | 只有1個(gè)輸入的源 |
| TwoInputStreamOperator | 有2個(gè)輸入源 |
| AbstractStreamOperator |
Function
Function是針對(duì)所有的用戶(hù)自定義的函數(shù),各子類(lèi)主要是實(shí)現(xiàn)對(duì)應(yīng)的,這里定義了種類(lèi)豐富的各類(lèi)Function的子接口類(lèi)來(lái)適配各種不同的加工場(chǎng)景,具體的就看源碼了,這里就不詳細(xì)介紹了
本節(jié)的最后,我們通過(guò)一個(gè)例子來(lái)看看這幾個(gè)類(lèi)是怎么組合的。如下是一個(gè)常見(jiàn)的對(duì)DataStream進(jìn)行map處理的操作
text
.flatMap(new Tokenizer)
處理后對(duì)應(yīng)的DataStream的結(jié)構(gòu)如下圖

DataStream生成提交執(zhí)行的Graph
前面分析了DataStream,是單個(gè)節(jié)點(diǎn)的,接下來(lái)看看整個(gè)streaming flow在flink中是怎么轉(zhuǎn)換為可以執(zhí)行的邏輯的。一般整個(gè)數(shù)據(jù)流我們叫做DAG,那在Flink中叫PipeLine,其實(shí)現(xiàn)類(lèi)是StreamGraph。這里先介紹2個(gè)概念
- StreamNode:streaming流中的一個(gè)節(jié)點(diǎn),代表對(duì)應(yīng)的算子
- StreamEdge:Graph中的邊,來(lái)連接上下游的StreamNode
如上圖所示,圓形為StreamNode,箭頭為StreamEdge,這樣通過(guò)這2者就可以構(gòu)建一個(gè)StreamGraph了。
StreamGraph是最原始的Graph,而其中會(huì)做一些優(yōu)化生成JobGraph,最后會(huì)生成待執(zhí)行的ExecutionGraph,這里我們先介紹下基礎(chǔ)概念,后面會(huì)深入介紹相關(guān)的內(nèi)容。 - JobGraph: 優(yōu)化后的StreamGraph,具體做的優(yōu)化就是把相連的算子,如果支持chaining的,合并到一個(gè)StreamNode;
- ExecutionGraph: 和JobGraph結(jié)構(gòu)一致
StreamGraph
下面我們來(lái)看看StreamGraph的主要屬性和方法,以及如何從DataStream轉(zhuǎn)換為StreamGraph的。
屬性和方法
重要屬性如下(這里只介紹與生成圖相關(guān)的屬性,還有一些如狀態(tài),存儲(chǔ)類(lèi)的后面介紹)
| 屬性 | 說(shuō)明 |
|---|---|
| Map<Integer, StreamNode> streamNodes | StreamNode數(shù)據(jù),kv格式,key為T(mén)ransformation的id |
| Set sources | StreamGraph的所有source集合,存儲(chǔ)的是Transformation的id |
| Set sinks | StreamGraph的sink集合 |
說(shuō)明:StreamGraph只記錄了StreamNode的信息,StreamEdge的信息是記錄在StreamNode中的。如下2個(gè)屬性記錄了StreamNode的輸入Edge和輸出Edge
//StreamNode.java
private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
主要方法
| 方法 | 說(shuō)明 |
|---|---|
| addSource() | 添加source節(jié)點(diǎn) |
| addSink() | 添加sink節(jié)點(diǎn) |
| addOperator() | 添加算子節(jié)點(diǎn) |
| addVirtualSideOutputNode() | 添加一個(gè)虛擬的siteOutput節(jié)點(diǎn) |
StreamGraph生成
下面我們來(lái)看看DataStream是如何生成StreamGraph的。通過(guò)前面對(duì)DataStream的分析可知,DataStream的前后依賴(lài)關(guān)系是通過(guò)Transformation來(lái)存儲(chǔ)的,這里StreamExecutionEnvironment有個(gè)transformations記錄了所有的Transformation
//StreamExecutionEnvironment.java
List<Transformation<?>> transformations
這里的數(shù)據(jù)是在DataStream進(jìn)行轉(zhuǎn)換處理生成了新的Transformation,同時(shí)會(huì)把該實(shí)例添加到transformations里面,使用的是如下方法
getExecutionEnvironment().addOperator(resultTransform);
而具體轉(zhuǎn)換為StreamGraph是通過(guò)StreamExecutionEnvironment的 getStreamGraph()方法。最終轉(zhuǎn)換的邏輯是通過(guò)StreamGraphGenerator類(lèi)來(lái)實(shí)現(xiàn)。
這里要介紹一個(gè)新的類(lèi)體系TransformationTranslator,有各種的子類(lèi)來(lái)轉(zhuǎn)換對(duì)應(yīng)類(lèi)型的Transformation。這里有定義了2個(gè)方法分別支持轉(zhuǎn)換Streaming和Batch。
//TransformationTranslator.java Collection<Integer> translateForBatch(final T transformation, final Context context); Collection<Integer> translateForStreaming(final T transformation, final Context context);
對(duì)應(yīng)的映射關(guān)系存儲(chǔ)在StreamGraphGenerator類(lèi)的translatorMap中。
//StreamGraphGenerator.java
private static final Map<
Class<? extends Transformation>,
TransformationTranslator<?, ? extends Transformation>>
translatorMap;
static {
@SuppressWarnings("rawtypes")
Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
tmp = new HashMap<>();
tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
...
下面我們通過(guò)OneInputTransformationTranslator為例來(lái)看看是如何進(jìn)行轉(zhuǎn)換的。具體邏輯如下
//調(diào)用addOperator添加StreamNode
streamGraph.addOperator(
transformationId,
slotSharingGroup,
transformation.getCoLocationGroupKey(),
operatorFactory,
inputType,
transformation.getOutputType(),
transformation.getName());
//獲取上游依賴(lài)的transformations,然后添加邊
final List<Transformation<?>> parentTransformations = transformation.getInputs();
for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
streamGraph.addEdge(inputId, transformationId, 0);
}
除了添加節(jié)點(diǎn)和邊外,還有一些如設(shè)置節(jié)點(diǎn)的并行度等操作,這塊大家可以去看看具體的代碼。
這樣當(dāng)把所有的Transformation都轉(zhuǎn)換完,這樣StreamGraph就生成好了。
JobGraph
有了StreamGraph,為什么還需要一個(gè)JobGraph呢,這個(gè)和Spark中的Stage類(lèi)似,如果有多個(gè)算子能夠合并到一起處理,那這樣性能可以提高很多。所以這里 根據(jù)一定的規(guī)則進(jìn)行,先我們介紹相關(guān)的類(lèi)
- JobVertex:job的頂點(diǎn),即對(duì)應(yīng)的計(jì)算邏輯(這里用的是Vertex, 而前面用的是Node,有點(diǎn)差異),通過(guò)inputs記錄了所有來(lái)源的Edge,而輸出是ArrayList來(lái)記錄
- JobEdge: job的邊,記錄了源Vertex和咪表Vertex.
- IntermediateDataSet: 定義了一個(gè)中間數(shù)據(jù)集,但并沒(méi)有存儲(chǔ),只是記錄了一個(gè)Producer(JobVertex)和一個(gè)Consumer(JobEdge)
主要的概念就這些,下面我們看看JobGraph的結(jié)構(gòu)以及如何從StreamGraph轉(zhuǎn)換為JobGraph
屬性和方法
JobGraph的屬性主要是通過(guò)Map<JobVertexID, JobVertex> taskVertices記錄了JobVertex的信息。
另外這個(gè)JobGraph是提交到集群去執(zhí)行的,所以會(huì)有一些執(zhí)行相關(guān)的信息,相關(guān)的如下:
private JobID jobID;
private final String jobName;
private SerializedValue<ExecutionConfig> serializedExecutionConfig;
/** Set of JAR files required to run this job. */
private final List<Path> userJars = new ArrayList<Path>();
/** Set of custom files required to run this job. */
private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts =
new HashMap<>();
/** Set of blob keys identifying the JAR files required to run this job. */
private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();
/** List of classpaths required to run this job. */
private List<URL> classpaths = Collections.emptyList();
而相關(guān)的方法主要是
| 方法 | 說(shuō)明 |
|---|---|
| addVertex() | 添加頂點(diǎn) |
| getVertices() | 獲取頂點(diǎn) |
而如何從StreamGraph轉(zhuǎn)換到JobGraph這塊的內(nèi)容還是比較多,這塊后續(xù)我們單獨(dú)開(kāi)一篇來(lái)介紹
總結(jié)
本篇從0開(kāi)始介紹了DataStream的相關(guān)內(nèi)容,并深入介紹了DataStream、Transformation、StreamOperator和Function之間的關(guān)系。另外介紹了streaming flow轉(zhuǎn)換為提交執(zhí)行的StreamGraph的過(guò)程及StreamGraph的存儲(chǔ)結(jié)構(gòu)。而從StreamGraph->JobGraph->ExecutionGraph這塊涉及的內(nèi)容也較多,且還涉及到提交部署的內(nèi)容,這塊后面單獨(dú)來(lái)介紹。最后本篇介紹的DataStream只是介紹了最基礎(chǔ)的計(jì)算框架,沒(méi)有涉及到flink的streaming flow中的時(shí)間、狀態(tài)、window等內(nèi)容,更多關(guān)于Flink DataStream基礎(chǔ)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
JDK動(dòng)態(tài)代理之WeakCache緩存的實(shí)現(xiàn)機(jī)制
這篇文章主要介紹了JDK動(dòng)態(tài)代理之WeakCache緩存的實(shí)現(xiàn)機(jī)制2018-02-02
Java實(shí)戰(zhàn)寵物醫(yī)院預(yù)約掛號(hào)系統(tǒng)的實(shí)現(xiàn)流程
只學(xué)書(shū)上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+JSP+Spring+SpringBoot+MyBatis+html+layui+maven+Mysql實(shí)現(xiàn)一個(gè)寵物醫(yī)院預(yù)約掛號(hào)系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平2022-01-01
eclipse 如何創(chuàng)建 user library 方法詳解
這篇文章主要介紹了eclipse 如何創(chuàng)建 user library 方法詳解的相關(guān)資料,需要的朋友可以參考下2017-04-04
解析Spring事件發(fā)布與監(jiān)聽(tīng)機(jī)制
本篇文章給大家介紹Spring事件發(fā)布與監(jiān)聽(tīng)機(jī)制,通過(guò) ApplicationEvent 事件類(lèi)和 ApplicationListener 監(jiān)聽(tīng)器接口,可以實(shí)現(xiàn) ApplicationContext 事件發(fā)布與處理,需要的朋友參考下吧2021-06-06
Java實(shí)現(xiàn)HashMap排序方法的示例詳解
這篇文章主要通過(guò)一些示例為大家介紹了Java對(duì)HashMap進(jìn)行排序的方法,幫助大家更好的理解和使用Java,感興趣的朋友可以了解一下2022-05-05

