Flink開發(fā)IDEA環(huán)境搭建與測試的方法
一.IDEA開發(fā)環(huán)境
1.pom文件設(shè)置
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>2.7.6</hadoop.version>
<flink.version>1.6.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!-- <arg>-make:transitive</arg> -->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
2.flink開發(fā)流程
Flink具有特殊類DataSet并DataStream在程序中表示數(shù)據(jù)。您可以將它們視為可以包含重復(fù)項的不可變數(shù)據(jù)集合。在DataSet數(shù)據(jù)有限的情況下,對于一個DataStream元素的數(shù)量可以是無界的。
這些集合在某些關(guān)鍵方面與常規(guī)Java集合不同。首先,它們是不可變的,這意味著一旦創(chuàng)建它們就無法添加或刪除元素。你也不能簡單地檢查里面的元素。
集合最初通過在弗林克程序添加源創(chuàng)建和新的集合從這些通過將它們使用API方法如衍生map,filter等等。
Flink程序看起來像是轉(zhuǎn)換數(shù)據(jù)集合的常規(guī)程序。每個程序包含相同的基本部分:
1.獲取execution environment,
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2.加載/創(chuàng)建初始化數(shù)據(jù)
DataStream<String> text = env.readTextFile(file:///path/to/file);
3.指定此數(shù)據(jù)的轉(zhuǎn)換
val mapped = input.map { x => x.toInt }
4.指定放置計算結(jié)果的位置
writeAsText(String path) print()
5.觸發(fā)程序執(zhí)行
在local模式下執(zhí)行程序
execute()
將程序達成jar運行在線上
./bin/flink run \ -m node21:8081 \ ./examples/batch/WordCount.jar \ --input hdfs:///user/admin/input/wc.txt\ --outputhdfs:///user/admin/output2\
二.Wordcount案例
1.Scala代碼
package com.xyg.streaming
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* Author: Mr.Deng
* Date: 2018/10/15
* Desc:
*/
object SocketWindowWordCountScala {
def main(args: Array[String]) : Unit = {
// 定義一個數(shù)據(jù)類型保存單詞出現(xiàn)的次數(shù)
case class WordWithCount(word: String, count: Long)
// port 表示需要連接的端口
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
return
}
}
// 獲取運行環(huán)境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 連接此socket獲取輸入數(shù)據(jù)
val text = env.socketTextStream("node21", port, '\n')
//需要加上這一行隱式轉(zhuǎn)換 否則在調(diào)用flatmap方法的時候會報錯
import org.apache.flink.api.scala._
// 解析數(shù)據(jù), 分組, 窗口化, 并且聚合求SUM
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// 打印輸出并設(shè)置使用一個并行度
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
}
2.Java代碼
package com.xyg.streaming;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Author: Mr.Deng
* Date: 2018/10/15
* Desc: 使用flink對指定窗口內(nèi)的數(shù)據(jù)進行實時統(tǒng)計,最終把結(jié)果打印出來
* 先在node21機器上執(zhí)行nc -l 9000
*/
public class StreamingWindowWordCountJava {
public static void main(String[] args) throws Exception {
//定義socket的端口號
int port;
try{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("沒有指定port參數(shù),使用默認(rèn)值9000");
port = 9000;
}
//獲取運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//連接socket獲取輸入的數(shù)據(jù)
DataStreamSource<String> text = env.socketTextStream("node21", port, "\n");
//計算數(shù)據(jù)
DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
String[] splits = value.split("\\s");
for (String word:splits) {
out.collect(new WordWithCount(word,1L));
}
}
})//打平操作,把每行的單詞轉(zhuǎn)為<word,count>類型的數(shù)據(jù)
//針對相同的word數(shù)據(jù)進行分組
.keyBy("word")
//指定計算數(shù)據(jù)的窗口大小和滑動窗口大小
.timeWindow(Time.seconds(2),Time.seconds(1))
.sum("count");
//把數(shù)據(jù)打印到控制臺,使用一個并行度
windowCount.print().setParallelism(1);
//注意:因為flink是懶加載的,所以必須調(diào)用execute方法,上面的代碼才會執(zhí)行
env.execute("streaming word count");
}
/**
* 主要為了存儲單詞以及單詞出現(xiàn)的次數(shù)
*/
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
3.運行測試
首先,使用nc命令啟動一個本地監(jiān)聽,命令是:
[admin@node21 ~]$ nc -l 9000
通過netstat命令觀察9000端口。netstat -anlp | grep 9000,啟動監(jiān)聽如果報錯:-bash: nc: command not found,請先安裝nc,在線安裝命令:yum -y install nc。
然后,IDEA上運行flink官方案例程序
node21上輸入

IDEA控制臺輸出如下

4.集群測試
這里單機測試官方案例
[admin@node21 flink-1.6.1]$ pwd /opt/flink-1.6.1 [admin@node21 flink-1.6.1]$ ./bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host node21. Starting taskexecutor daemon on host node21. [admin@node21 flink-1.6.1]$ jps StandaloneSessionClusterEntrypoint TaskManagerRunner Jps [admin@node21 flink-1.6.1]$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
程序連接到套接字并等待輸入。您可以檢查Web界面以驗證作業(yè)是否按預(yù)期運行:


單詞在5秒的時間窗口(處理時間,翻滾窗口)中計算并打印到stdout。監(jiān)視TaskManager的輸出文件并寫入一些文本nc(輸入在點擊后逐行發(fā)送到Flink):


三.使用IDEA開發(fā)離線程序
Dataset是flink的常用程序,數(shù)據(jù)集通過source進行初始化,例如讀取文件或者序列化集合,然后通過transformation(filtering、mapping、joining、grouping)將數(shù)據(jù)集轉(zhuǎn)成,然后通過sink進行存儲,既可以寫入hdfs這種分布式文件系統(tǒng),也可以打印控制臺,flink可以有很多種運行方式,如local、flink集群、yarn等.
1. scala程序
package com.xyg.batch
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
/**
* Author: Mr.Deng
* Date: 2018/10/19
* Desc:
*/
object WordCountScala{
def main(args: Array[String]) {
//初始化環(huán)境
val env = ExecutionEnvironment.getExecutionEnvironment
//從字符串中加載數(shù)據(jù)
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
//分割字符串、匯總tuple、按照key進行分組、統(tǒng)計分組后word個數(shù)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
//打印
counts.print()
}
}
2. java程序
package com.xyg.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Author: Mr.Deng
* Date: 2018/10/19
* Desc:
*/
public class WordCountJava {
public static void main(String[] args) throws Exception {
//構(gòu)建環(huán)境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//通過字符串構(gòu)建數(shù)據(jù)集
DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
//分割字符串、按照key進行分組、統(tǒng)計相同的key個數(shù)
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
//打印
wordCounts.print();
}
//分割字符串的方法
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
3.運行

到此這篇關(guān)于Flink開發(fā)IDEA環(huán)境搭建與測試的方法的文章就介紹到這了,更多相關(guān)Flink IDEA環(huán)境搭建 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spark學(xué)習(xí)筆記Spark Streaming的使用
這篇文章主要介紹了Spark學(xué)習(xí)筆記Spark Streaming的使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06
關(guān)于實體類中Date屬性格式化@JsonFormat @DateTimeFormat
這篇文章主要介紹了關(guān)于實體類中Date屬性格式化@JsonFormat @DateTimeFormat問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07
SpringCloud微服務(wù)的調(diào)用與遠程調(diào)用測試示例
這篇文章主要介紹了SpringCloud微服務(wù)的調(diào)用與遠程調(diào)用測試示例,服務(wù)調(diào)用者-可以暫時認(rèn)為是與用戶交互的角色(因為存在微服務(wù)之間的調(diào)用),可以根據(jù)該用戶的類型將其賦予不同的服務(wù)調(diào)用權(quán)限,通過一次http請求訪問調(diào)用對應(yīng)的微服務(wù)獲取想要的數(shù)據(jù)2023-04-04
java?9大性能優(yōu)化經(jīng)驗總結(jié)
這篇文章主要介紹了java?9大性能優(yōu)化經(jīng)驗總結(jié),包括:Java代碼優(yōu)化,數(shù)據(jù)庫優(yōu)化,分布式緩存,異步化,Web前段,搜索引擎優(yōu)化等需要的朋友可以參考下2023-02-02
SpringBoot整合Mybatis Plus多數(shù)據(jù)源的實現(xiàn)示例
本文主要介紹了SpringBoot整合Mybatis Plus多數(shù)據(jù)源的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-11-11
springboot實現(xiàn)執(zhí)行sql語句打印到控制臺
這篇文章主要介紹了springboot實現(xiàn)執(zhí)行sql語句打印到控制臺的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06
ajax實時監(jiān)測與springboot的實例分析
本文將介紹如何使用 AJAX 技術(shù)結(jié)合 Spring Boot 構(gòu)建一個實時反饋用戶輸入的應(yīng)用,我們將創(chuàng)建一個簡單的輸入框,當(dāng)用戶在輸入框中鍵入文本時,應(yīng)用將異步地向后端發(fā)送請求,感興趣的朋友跟隨小編一起看看吧2024-07-07
Spring?Boot應(yīng)用中如何動態(tài)指定數(shù)據(jù)庫實現(xiàn)不同用戶不同數(shù)據(jù)庫的問題
讓我們創(chuàng)建一個 Spring Boot 項目首先設(shè)置一個具有必要依賴項的新 Spring Boot項目,在項目配置中包括 Spring Web、Spring Data JPA 和關(guān)于數(shù)據(jù)庫的依賴項,接下來介紹Spring?Boot應(yīng)用中如何動態(tài)指定數(shù)據(jù)庫,實現(xiàn)不同用戶不同數(shù)據(jù)庫的場景?,需要的朋友可以參考下2024-04-04

