Spring Boot 整合 Apache Flink 的詳細(xì)過程
Spring Boot 整合 Apache Flink 教程
一、背景與目標(biāo)
Apache Flink 是一個(gè)高性能的分布式流處理框架,而Spring Boot提供了快速構(gòu)建企業(yè)級(jí)應(yīng)用的能力。整合二者可實(shí)現(xiàn):
- 利用Spring Boot的依賴注入、配置管理等功能簡(jiǎn)化Flink作業(yè)開發(fā)
- 構(gòu)建完整的微服務(wù)架構(gòu),將流處理嵌入Spring生態(tài)
- 實(shí)現(xiàn)動(dòng)態(tài)作業(yè)提交與管理
二、環(huán)境準(zhǔn)備
- JDK 17+
- Maven 3.8+
- Spring Boot 3.1.5
- Flink 1.17.2
三、創(chuàng)建項(xiàng)目 & 添加依賴
1. 創(chuàng)建Spring Boot項(xiàng)目
使用Spring Initializr生成基礎(chǔ)項(xiàng)目,選擇:
- Maven
- Spring Web(可選,用于創(chuàng)建REST接口)
2. 添加Flink依賴
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Flink核心依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.2</version>
<scope>provided</scope>
</dependency>
<!-- 本地執(zhí)行時(shí)需添加 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>1.17.2</version>
<scope>test</scope>
</dependency>
</dependencies>四、基礎(chǔ)整合示例
1. 編寫Flink流處理作業(yè)
// src/main/java/com/example/demo/flink/WordCountJob.java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountJob {
public static void execute() throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(
"Spring Boot整合Flink",
"Flink實(shí)時(shí)流處理",
"Spring生態(tài)集成"
);
DataStream<WordCount> counts = text
.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordCount(word, 1L));
}
}
})
.keyBy(value -> value.word)
.sum("count");
counts.print();
env.execute("Spring Boot Flink Job");
}
public static class WordCount {
public String word;
public long count;
public WordCount() {}
public WordCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}2. 在Spring Boot中啟動(dòng)作業(yè)
// src/main/java/com/example/demo/DemoApplication.java
@SpringBootApplication
public class DemoApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
WordCountJob.execute(); // 啟動(dòng)Flink作業(yè)
}
}五、進(jìn)階整合 - 通過REST API動(dòng)態(tài)提交作業(yè)
1. 創(chuàng)建Job提交服務(wù)
// src/main/java/com/example/demo/service/FlinkJobService.java
@Service
public class FlinkJobService {
public String submitWordCountJob(List<String> inputLines) {
try {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromCollection(inputLines);
// ...(同上WordCount邏輯)
JobExecutionResult result = env.execute();
return "JobID: " + result.getJobID();
} catch (Exception e) {
return "Job Failed: " + e.getMessage();
}
}
}2. 創(chuàng)建REST控制器
// src/main/java/com/example/demo/controller/JobController.java
@RestController
@RequestMapping("/jobs")
public class JobController {
@Autowired
private FlinkJobService flinkJobService;
@PostMapping("/wordcount")
public String submitWordCount(@RequestBody List<String> inputs) {
return flinkJobService.submitWordCountJob(inputs);
}
}六、關(guān)鍵配置說明
1. application.properties
# 設(shè)置Flink本地執(zhí)行環(huán)境 spring.flink.local.enabled=true spring.flink.job.name=SpringBootFlinkJob # 調(diào)整并行度(根據(jù)CPU核心數(shù)) spring.flink.parallelism=4
2. 解決依賴沖突
在pom.xml中排除沖突依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.17.2</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>七、運(yùn)行與驗(yàn)證
啟動(dòng)Spring Boot應(yīng)用:
mvn spring-boot:run
調(diào)用API提交作業(yè):
curl -X POST -H "Content-Type: application/json" \ -d '["Hello Flink", "Spring Boot Integration"]' \ http://localhost:8080/jobs/wordcount
查看控制臺(tái)輸出:
Flink> Spring : 1
Flink> Boot : 1
Flink> Integration : 1
...
八、生產(chǎn)環(huán)境注意事項(xiàng)
集群部署:將打包后的jar提交到Flink集群
flink run -c com.example.demo.DemoApplication your-application.jar
狀態(tài)管理:集成Flink State Backend(如RocksDB)
監(jiān)控集成:通過Micrometer接入Spring Boot Actuator
資源隔離:使用Yarn或Kubernetes部署模式
九、完整項(xiàng)目結(jié)構(gòu)
src/ ├── main/ │ ├── java/ │ │ ├── com/example/demo/ │ │ │ ├── DemoApplication.java │ │ │ ├── flink/ │ │ │ │ └── WordCountJob.java │ │ │ ├── controller/ │ │ │ ├── service/ │ ├── resources/ │ │ └── application.properties pom.xml
通過以上步驟,即可實(shí)現(xiàn)Spring Boot與Apache Flink的深度整合。這種架構(gòu)特別適合需要將實(shí)時(shí)流處理能力嵌入微服務(wù)體系的場(chǎng)景,如實(shí)時(shí)風(fēng)控系統(tǒng)、IoT數(shù)據(jù)處理平臺(tái)等。后續(xù)可擴(kuò)展集成Kafka、HBase等大數(shù)據(jù)組件。
到此這篇關(guān)于Spring Boot 整合 Apache Flink 教程的文章就介紹到這了,更多相關(guān)Spring Boot 整合 Apache Flink內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
Mybatis使用foreach標(biāo)簽實(shí)現(xiàn)批量插入方式
Spring?Boot中觸發(fā)異步任務(wù)的幾種實(shí)現(xiàn)方式總結(jié)
MyBatis中有關(guān)int和Integer的使用方式
將java程序打包成可執(zhí)行文件的實(shí)現(xiàn)方式
Netty學(xué)習(xí)之理解selector原理示例

