Java lambda表達(dá)式實(shí)現(xiàn)Flink WordCount過程解析
這篇文章主要介紹了Java lambda表達(dá)式實(shí)現(xiàn)Flink WordCount過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
本篇我們將使用Java語言來實(shí)現(xiàn)Flink的單詞統(tǒng)計(jì)。
代碼開發(fā)
環(huán)境準(zhǔn)備
導(dǎo)入Flink 1.9 pom依賴
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
</dependencies>
構(gòu)建Flink流處理環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
自定義source
每秒生成一行文本
DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
private boolean isCanal = false;
private String[] words = {
"important oracle jdk license update",
"the oracle jdk license has changed for releases starting april 16 2019",
"the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
"personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
"downloading and using this product an faq is available here ",
"commercial license and support is available with a low cost java se subscription",
"oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
};
@Override
public void run(SourceContext<String> ctx) throws Exception {
// 每秒發(fā)送一行文本
while (!isCanal) {
int randomIndex = RandomUtils.nextInt(0, words.length);
ctx.collect(words[randomIndex]);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isCanal = true;
}
});
單詞計(jì)算
// 3. 單詞統(tǒng)計(jì)
// 3.1 將文本行切分成一個(gè)個(gè)的單詞
SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
// 切分單詞
Arrays.stream(line.split(" ")).forEach(word -> {
ctx.collect(word);
});
}).returns(Types.STRING);
//3.2 將單詞轉(zhuǎn)換為一個(gè)個(gè)的元組
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
.map(word -> Tuple2.of(word, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
// 3.3 按照單詞進(jìn)行分組
KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);
// 3.4 對每組單詞數(shù)量進(jìn)行累加
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
.timeWindow(Time.seconds(3))
.reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));
resultDS.print();
參考代碼
public class WordCount {
public static void main(String[] args) throws Exception {
// 1. 構(gòu)建Flink流式初始化環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 自定義source - 每秒發(fā)送一行文本
DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
private boolean isCanal = false;
private String[] words = {
"important oracle jdk license update",
"the oracle jdk license has changed for releases starting april 16 2019",
"the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
"personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
"downloading and using this product an faq is available here ",
"commercial license and support is available with a low cost java se subscription",
"oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
};
@Override
public void run(SourceContext<String> ctx) throws Exception {
// 每秒發(fā)送一行文本
while (!isCanal) {
int randomIndex = RandomUtils.nextInt(0, words.length);
ctx.collect(words[randomIndex]);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isCanal = true;
}
});
// 3. 單詞統(tǒng)計(jì)
// 3.1 將文本行切分成一個(gè)個(gè)的單詞
SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
// 切分單詞
Arrays.stream(line.split(" ")).forEach(word -> {
ctx.collect(word);
});
}).returns(Types.STRING);
//3.2 將單詞轉(zhuǎn)換為一個(gè)個(gè)的元組
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
.map(word -> Tuple2.of(word, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
// 3.3 按照單詞進(jìn)行分組
KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);
// 3.4 對每組單詞數(shù)量進(jìn)行累加
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
.timeWindow(Time.seconds(3))
.reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));
resultDS.print();
env.execute("app");
}
}
Flink對Java Lambda表達(dá)式支持情況
Flink支持Java API所有操作符使用Lambda表達(dá)式。但是,但Lambda表達(dá)式使用Java泛型時(shí),就需要聲明類型信息。
我們來看下上述的這段代碼:
SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
// 切分單詞
Arrays.stream(line.split(" ")).forEach(word -> {
ctx.collect(word);
});
}).returns(Types.STRING);
之所以這里將所有的類型信息,因?yàn)镕link無法正確自動(dòng)推斷出來Collector中帶的泛型。我們來看一下FlatMapFuntion的源代碼
@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
/**
* The core method of the FlatMapFunction. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
void flatMap(T value, Collector<O> out) throws Exception;
}
我們發(fā)現(xiàn) flatMap的第二個(gè)參數(shù)是Collector<O>,是一個(gè)帶參數(shù)的泛型。Java編譯器編譯該代碼時(shí)會進(jìn)行參數(shù)類型擦除,所以Java編譯器會變成成:
void flatMap(T value, Collector out)
這種情況,F(xiàn)link將無法自動(dòng)推斷類型信息。如果我們沒有顯示地提供類型信息,將會出現(xiàn)以下錯(cuò)誤:
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
這種情況下,必須要顯示指定類型信息,否則輸出將返回值視為Object類型,這將導(dǎo)致Flink無法正確序列化。
所以,我們需要顯示地指定Lambda表達(dá)式的參數(shù)類型信息,并通過returns方法顯示指定輸出的類型信息
我們再看一段代碼:
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
.map(word -> Tuple2.of(word, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
為什么map后面也需要指定類型呢?
因?yàn)榇颂巑ap返回的是Tuple2類型,Tuple2是帶有泛型參數(shù),在編譯的時(shí)候同樣會被查出泛型參數(shù)信息,導(dǎo)致Flink無法正確推斷。
更多關(guān)于對Java Lambda表達(dá)式的支持請參考官網(wǎng):https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
spring-boot-autoconfigure模塊用法詳解
autoconfigure就是自動(dòng)配置的意思,spring-boot通過spring-boot-autoconfigure體現(xiàn)了"約定優(yōu)于配置"這一設(shè)計(jì)原則,而spring-boot-autoconfigure主要用到了spring.factories和幾個(gè)常用的注解條件來實(shí)現(xiàn)自動(dòng)配置,思路很清晰也很簡單,感興趣的朋友跟隨小編一起看看吧2022-11-11
使用Java如何對復(fù)雜的數(shù)據(jù)類型排序和比大小
我相信大家在第一次接觸算法的時(shí)候,最先接觸的肯定也是從排序算法開始的,下面這篇文章主要給大家介紹了關(guān)于使用Java如何對復(fù)雜的數(shù)據(jù)類型排序和比大小的相關(guān)資料,需要的朋友可以參考下2023-12-12
SpringBoot項(xiàng)目配置文件注釋亂碼的問題解決方案
這篇文章主要介紹了SpringBoot 項(xiàng)目配置文件注釋亂碼的問題解決方案,文中通過圖文結(jié)合的方式給大家講解的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-07-07
Spring6當(dāng)中獲取Bean的四種方式小結(jié)
Spring 為Bean 的獲取提供了多種方式,通常包括4種方式,(也就是說在Spring中為Bean對象的創(chuàng)建準(zhǔn)備了多種方案,目的是:更加靈活),本文將通過代碼示例詳細(xì)的給大家介紹了一下這四種方式,需要的朋友可以參考下2024-04-04
Java編程使用UDP建立群聊系統(tǒng)代碼實(shí)例
這篇文章主要介紹了Java編程使用UDP建立群聊系統(tǒng)代碼實(shí)例,具有一定借鑒價(jià)值,需要的朋友可以參考下。2018-01-01

