Flink入門級應(yīng)用域名處理示例
概述
最近做了一個小任務(wù),要使用Flink處理域名數(shù)據(jù),在4GB的域名文檔中求出每個域名的頂級域名,最后輸出每個頂級域名下的前10個子級域名。一個比較簡單的入門級Flink應(yīng)用,代碼很容易寫,主要用到的算子有FlatMap、KeyBy、Reduce。但是由于Maven打包問題,總是提示找不到入口類,卡了好久,最后也是成功解決了。
主體代碼如下:
public class FlinkStreamingTopDomain {
public static void main(String[] args) throws Exception{
// 獲取流處理運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 獲取kafkaConsumer
FlinkKafkaConsumer<String> kafkaConsumer = FlinkUtil.getKafkaConsumer("ahl_test1", "console-consumer-72096");
// 從當前消費組下標開始讀取
kafkaConsumer.setStartFromEarliest();
DataStreamSource text = env.addSource(kafkaConsumer);
// 算子
DataStream<Tuple2<String,String>> windowCount = text.flatMap(new FlatMap())
.keyBy(0).reduce(new Reduce());
//把數(shù)據(jù)打印到控制臺
windowCount.print()
.setParallelism(16);//使用16個并行度
//注意:因為flink是懶加載的,所以必須調(diào)用execute方法,上面的代碼才會執(zhí)行
env.execute("streaming topDomain calculate");
}
}
算子
FlatMap
Flatmap是對一行字符進行處理的,官網(wǎng)上的解釋如下
FlatMap
DataStream → DataStream
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
其實和Hadoop的Map差不多,都是把一行字符串進行處理,得到我們想要的<key,value>,不同之處在于Map處理后得到的是<key,values[]>。即Hadoop的Map操作會按key自動的將value處理成數(shù)組的形式,而Flink的FlatMap算子只會把每行數(shù)據(jù)處理成key、value。
下面是我處理業(yè)務(wù)的FlatMap代碼
// FlatMap分割域名,并輸出二元組<頂級域名,域名>
public static class FlatMap implements FlatMapFunction<String, Tuple2<String,String>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, String>> out) throws Exception {
String[] values = s.split("\\^"); // 按字符^分割
if(values.length - 1 < 2) {
return;
}
String domain = values[2];
out.collect(new Tuple2<String,String>(ToolUtil.getTopDomain(domain),domain));
}
}
我這里把數(shù)據(jù)處理成了二元組形式,之后reduce也是對這個二元組進行處理。
KeyBy
先來看看官網(wǎng)的解釋
KeyBy
DataStream → KeyedStream
Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.
This transformation returns a KeyedStream, which is, among other things, required to use keyed state.
dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple
Attention:A type cannot be a key if:
1.it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation.
2.it is an array of any type.
keyBy會按照一個keySelector定義的方式進行哈希分區(qū),會將一個流分成多個Partition,相同key的會被分在同一個分區(qū),經(jīng)過keyBy的流變成KeyedStream。
需要注意的有兩點:
1.pojo類型作為key,必須重寫hashcode()方法
2.數(shù)組類型不能作為key
Reduce
官網(wǎng)的解釋如下
Reduce
KeyedStream → DataStream
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
A reduce function that creates a stream of partial sums:
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
reduce是進行”滾動“處理的,即reduce方法的第一個參數(shù)是當前已經(jīng)得到的結(jié)果記為currentResult,第二個參數(shù)是當前要處理的<key,value>。流式計算會一條一條的處理數(shù)據(jù),每處理完一條數(shù)據(jù)就得到新的currentResult。
業(yè)務(wù)處理代碼如下
// 拼接同一分區(qū)下的ip
public static class Reduce implements ReduceFunction<Tuple2<String,String>>{
@Override
public Tuple2<String,String> reduce(Tuple2 t1, Tuple2 t2) throws Exception {
String[] domains = t1.f1.toString().split("\\^");
if(domains.length == 10){
return t1;
}
t1.f1 = t1.f1.toString() + "^" + t2.f1.toString();
System.out.println(t1.f1 );
return t1;
}
}
連接socket測試
1.將主體代碼里的kafka獲取數(shù)據(jù),改成socket獲取數(shù)據(jù)
// int port;
// try {
// ParameterTool parameterTool = ParameterTool.fromArgs(args);
// port = parameterTool.getInt("port");
// } catch (Exception e){
// System.out.println("沒有指定port參數(shù),使用默認值1112");
// port = 1112;
// }
// 連接socket獲取輸入數(shù)據(jù)
// DataStreamSource<String> text = env.socketTextStream("192.168.3.221",port);
2.在服務(wù)器開啟一個端口號:nc -l -p 1112
3.運行代碼
4.服務(wù)器輸入測試數(shù)據(jù)就可以實時的獲取處理結(jié)果
連接kafka
正式
使用kafka命令創(chuàng)建主題
kafka-topics.sh --create --zookeeper IP1:2181 IP2:2181... --replication-factor 2 --partitions 16 --topic ahl_test
kafka建立topic需要先開啟zookeeper
運行生產(chǎn)者jar包,用生產(chǎn)者讀取數(shù)據(jù)
java -jar $jar包路徑 $topic $path
測試
另外,還可以使用測試生產(chǎn)者實現(xiàn)和socket測試相同的效果
/kafka-console-producer.sh --broker-list slave3:9092 --topic ahl_test1
打包上傳服務(wù)器
打包上傳服務(wù)器注意不要使用idea提供的build方式,反正我使用build會一直報錯找不到主類,即便我反編譯jar包發(fā)現(xiàn)主類在里面,并且MF文件也有配置主類信息。這個問題卡了我很久,最后我使用mvn pakage的方式打包并運行成功,把我的打包插件貼出來幫助遇到和我相同問題的人
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<!-- <createDependencyReducedPom>false</createDependencyReducedPom>-->
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.ncs.flink.streaming.FlinkStreamingTopDomain</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Flink運行指令為:
/home/soft/flink-1.12.0//bin/flink run -c com.ncs.flink.streaming.FlinkStreamingDomainJob /home/ahl/flink/situation-mapred-flink-0.0.1-SNAPSHOT.jar
或者可以訪問Flink集群的8081端口,在提供的UI頁面上傳運行

以上就是Flink入門級應(yīng)用域名處理示例的詳細內(nèi)容,更多關(guān)于Flink域名處理的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java中int轉(zhuǎn)string與string轉(zhuǎn)int的效率對比
這篇文章主要介紹了java中int轉(zhuǎn)string與string轉(zhuǎn)int的效率對比,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
Java將Object轉(zhuǎn)換為數(shù)組的代碼
這篇文章主要介紹了Java將Object轉(zhuǎn)換為數(shù)組的情況,今天在使用一個別人寫的工具類,這個工具類,主要是判空操作,包括集合、數(shù)組、Map等對象是否為空的操作,需要的朋友可以參考下2022-09-09
簡單談?wù)刯ava中匿名內(nèi)部類構(gòu)造函數(shù)
這篇文章主要簡單給我們介紹了java中匿名內(nèi)部類構(gòu)造函數(shù),并附上了簡單的示例,有需要的小伙伴可以參考下。2015-11-11
Zuul 實現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié)
這篇文章主要介紹了Zuul 實現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
基于@RequestParam注解之Spring MVC參數(shù)綁定的利器
這篇文章主要介紹了基于@RequestParam注解之Spring MVC參數(shù)綁定的利器,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2025-03-03
Spring?Boot自動配置的原理及@Conditional條件注解
這篇文章主要介紹了Spring?Boot自動配置的原理及@Conditional條件注解,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,感興趣的朋友可以參考一下2022-07-07

