SpringBoot集成flink全過程
SpringBoot集成flink
Flink是一個(gè)批處理和流處理結(jié)合的統(tǒng)一計(jì)算框架,其核心是一個(gè)提供了數(shù)據(jù)分發(fā)以及并行化計(jì)算的流數(shù)據(jù)處理引擎。
最大亮點(diǎn)是流處理,最適合的應(yīng)用場景是低時(shí)延的數(shù)據(jù)處理。
場景
高并發(fā)pipeline處理數(shù)據(jù),時(shí)延毫秒級,且兼具可靠性。
環(huán)境搭建
①、安裝flink
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/
②、安裝Netcat
Netcat(又稱為NC)是一個(gè)計(jì)算機(jī)網(wǎng)絡(luò)工具,它可以在兩臺(tái)計(jì)算機(jī)之間建立 TCP/IP 或 UDP 連接。
用于測試網(wǎng)絡(luò)中的端口,發(fā)送文件等操作。
進(jìn)行網(wǎng)絡(luò)調(diào)試和探測,也可以進(jìn)行加密連接和遠(yuǎn)程管理等高級網(wǎng)絡(luò)操作
yum install -y nc # 安裝nc命令 nc -lk 8888 # 啟動(dòng)socket端口
無界流之讀取socket文本流
一、依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 添加 Flink 依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>1.17.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.et.flink.job.SocketJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>二、SoketJob
public class SocketJob{
public static void main(String[] args)throws Exception{
// 創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定并行度,默認(rèn)電腦線程數(shù)
env.setParallelism(3);
// 讀取數(shù)據(jù)socket文本流 指定監(jiān)聽 IP 端口 只有在接收到數(shù)據(jù)才會(huì)執(zhí)行任務(wù)
DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);
// 處理數(shù)據(jù): 切換、轉(zhuǎn)換、分組、聚合 得到統(tǒng)計(jì)結(jié)果
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
.flatMap(
(String value, Collector<Tuple2<String, Integer>> out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
)
.setParallelism(2)
// // 顯式地提供類型信息:對于flatMap傳入Lambda表達(dá)式,系統(tǒng)只能推斷出返回的是Tuple2類型,而無法得到Tuple2<String, Long>。只有顯式設(shè)置系統(tǒng)當(dāng)前返回類型,才能正確解析出完整數(shù)據(jù)
.returns(new TypeHint<Tuple2<String, Integer>>() {
})
// .returns(Types.TUPLE(Types.STRING,Types.INT))
.keyBy(value -> value.f0)
.sum(1);
// 輸出
sum.print();
// 執(zhí)行
env.execute();
}
}測試:
啟動(dòng)socket流:
nc -l 8888
本地執(zhí)行:直接ideal啟動(dòng)main程序,在socket流中輸入
abc bcd cde bcd cde fgh cde fgh hij

集群執(zhí)行:
執(zhí)行maven打包,將打包的jar上傳到集群中

總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring Boot Maven Plugin打包異常解決方案
這篇文章主要介紹了Spring Boot Maven Plugin打包異常解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11
SpringBoot整合sharding-jdbc實(shí)現(xiàn)自定義分庫分表的實(shí)踐
本文主要介紹了SpringBoot整合sharding-jdbc實(shí)現(xiàn)自定義分庫分表的實(shí)踐,將通過自定義算法來實(shí)現(xiàn)定制化的分庫分表來擴(kuò)展相應(yīng)業(yè)務(wù),感興趣的可以了解一下2021-11-11
Java中String、StringBuffer和StringBuilder的區(qū)別
這篇文章主要介紹了Java中String、StringBuffer和StringBuilder的區(qū)別,StringBuilder與StringBuffer都繼承自AbstractStringBuilder類,在AbstractStringBuilder中也是使用字符數(shù)組保存字符串char[]value但是沒有final關(guān)鍵字修飾,所以這兩個(gè)可變,需要的朋友可以參考下2024-01-01
解決swagger主頁訪問,返回報(bào)錯(cuò)500問題
在使用Swagger時(shí)遇到500錯(cuò)誤,通過仔細(xì)的debug發(fā)現(xiàn)問題源于注解使用不當(dāng),具體表現(xiàn)為一個(gè)接口的入?yún)⒈诲e(cuò)誤地注解了三個(gè)參數(shù),而實(shí)際上只有兩個(gè),這導(dǎo)致了Swagger在解析時(shí)拋出了NullPointerException異常,解決方法是刪除錯(cuò)誤的第三個(gè)參數(shù)的注解2024-09-09
SpringBoot實(shí)現(xiàn)文件上傳下載功能小結(jié)
最近做的一個(gè)項(xiàng)目涉及到文件上傳與下載功能。SpringBoot后臺(tái)如何實(shí)現(xiàn)文件上傳下載呢?下面有單文件上傳和多文件上傳功能,感興趣的朋友一起看看吧2017-08-08
SpringBoot + SpringSecurity 環(huán)境搭建的步驟
這篇文章主要介紹了SpringBoot + SpringSecurity 環(huán)境搭建的步驟,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-05-05
maven pom中內(nèi)置變量及引用的實(shí)現(xiàn)
maven其實(shí)有很多內(nèi)置變量供開發(fā)著在開發(fā)中使用,本文主要介紹了maven pom中內(nèi)置變量及引用的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01
mybatis-plus IdWorker生成的Id和返回給前臺(tái)的不一致的解決
這篇文章主要介紹了mybatis-plus IdWorker生成的Id和返回給前臺(tái)的不一致的解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03

