詳解批處理框架之Spring Batch
一、Spring Batch的概念知識(shí)
1.1、分層架構(gòu)
Spring Batch的分層架構(gòu)圖如下:

可以看到它分為三層,分別是:
Application應(yīng)用層:包含了所有任務(wù)batch jobs和開(kāi)發(fā)人員自定義的代碼,主要是根據(jù)項(xiàng)目需要開(kāi)發(fā)的業(yè)務(wù)流程等。Batch Core核心層:包含啟動(dòng)和管理任務(wù)的運(yùn)行環(huán)境類,如JobLauncher等。Batch Infrastructure基礎(chǔ)層:上面兩層是建立在基礎(chǔ)層之上的,包含基礎(chǔ)的讀入reader和寫(xiě)出writer、重試框架等。
1.2、關(guān)鍵概念
理解下圖所涉及的概念至關(guān)重要,不然很難進(jìn)行后續(xù)開(kāi)發(fā)和問(wèn)題分析。

1.2.1、JobRepository
專門負(fù)責(zé)與數(shù)據(jù)庫(kù)打交道,對(duì)整個(gè)批處理的新增、更新、執(zhí)行進(jìn)行記錄。所以Spring Batch是需要依賴數(shù)據(jù)庫(kù)來(lái)管理的。
1.2.2、任務(wù)啟動(dòng)器JobLauncher
負(fù)責(zé)啟動(dòng)任務(wù)Job。
1.2.3、任務(wù)Job
Job是封裝整個(gè)批處理過(guò)程的單位,跑一個(gè)批處理任務(wù),就是跑一個(gè)Job所定義的內(nèi)容。

上圖介紹了Job的一些相關(guān)概念:
Job:封裝處理實(shí)體,定義過(guò)程邏輯。JobInstance:Job的運(yùn)行實(shí)例,不同的實(shí)例,參數(shù)不同,所以定義好一個(gè)Job后可以通過(guò)不同參數(shù)運(yùn)行多次。JobParameters:與JobInstance相關(guān)聯(lián)的參數(shù)。JobExecution:代表Job的一次實(shí)際執(zhí)行,可能成功、可能失敗。
所以,開(kāi)發(fā)人員要做的事情,就是定義Job。
1.2.4、步驟Step
Step是對(duì)Job某個(gè)過(guò)程的封裝,一個(gè)Job可以包含一個(gè)或多個(gè)Step,一步步的Step按特定邏輯執(zhí)行,才代表Job執(zhí)行完成。

通過(guò)定義Step來(lái)組裝Job可以更靈活地實(shí)現(xiàn)復(fù)雜的業(yè)務(wù)邏輯。
1.2.5、輸入——處理——輸出
所以,定義一個(gè)Job關(guān)鍵是定義好一個(gè)或多個(gè)Step,然后把它們組裝好即可。而定義Step有多種方法,但有一種常用的模型就是輸入——處理——輸出,即Item Reader、Item Processor和Item Writer。比如通過(guò)Item Reader從文件輸入數(shù)據(jù),然后通過(guò)Item Processor進(jìn)行業(yè)務(wù)處理和數(shù)據(jù)轉(zhuǎn)換,最后通過(guò)Item Writer寫(xiě)到數(shù)據(jù)庫(kù)中去。
Spring Batch為我們提供了許多開(kāi)箱即用的Reader和Writer,非常方便。
二、代碼實(shí)例
理解了基本概念后,就直接通過(guò)代碼來(lái)感受一下吧。整個(gè)項(xiàng)目的功能是從多個(gè)csv文件中讀數(shù)據(jù),處理后輸出到一個(gè)csv文件。
2.1、基本框架
添加依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency>
需要添加Spring Batch的依賴,同時(shí)使用H2作為內(nèi)存數(shù)據(jù)庫(kù)比較方便,實(shí)際生產(chǎn)肯定是要使用外部的數(shù)據(jù)庫(kù),如Oracle、PostgreSQL。
入口主類:
@SpringBootApplication
@EnableBatchProcessing
public class PkslowBatchJobMain {
public static void main(String[] args) {
SpringApplication.run(PkslowBatchJobMain.class, args);
}
}
也很簡(jiǎn)單,只是在Springboot的基礎(chǔ)上添加注解@EnableBatchProcessing。
領(lǐng)域?qū)嶓w類Employee:
package com.pkslow.batch.entity;
public class Employee {
String id;
String firstName;
String lastName;
}
對(duì)應(yīng)的csv文件內(nèi)容如下:
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
2.2、輸入——處理——輸出
2.2.1、讀取ItemReader
因?yàn)橛卸鄠€(gè)輸入文件,所以定義如下:
@Value("input/inputData*.csv")
private Resource[] inputResources;
@Bean
public MultiResourceItemReader<Employee> multiResourceItemReader()
{
MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>();
resourceItemReader.setResources(inputResources);
resourceItemReader.setDelegate(reader());
return resourceItemReader;
}
@Bean
public FlatFileItemReader<Employee> reader()
{
FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
//跳過(guò)csv文件第一行,為表頭
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
//字段名
setNames(new String[] { "id", "firstName", "lastName" });
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
{
//轉(zhuǎn)換化后的目標(biāo)類
setTargetType(Employee.class);
}
});
}
});
return reader;
}
這里使用了FlatFileItemReader,方便我們從文件讀取數(shù)據(jù)。
2.2.2、處理ItemProcessor
為了簡(jiǎn)單演示,處理很簡(jiǎn)單,就是把最后一列轉(zhuǎn)為大寫(xiě):
public ItemProcessor<Employee, Employee> itemProcessor() {
return employee -> {
employee.setLastName(employee.getLastName().toUpperCase());
return employee;
};
}
2.2.3、輸出ItremWriter
比較簡(jiǎn)單,代碼及注釋如下:
private Resource outputResource = new FileSystemResource("output/outputData.csv");
@Bean
public FlatFileItemWriter<Employee> writer()
{
FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>();
writer.setResource(outputResource);
//是否為追加模式
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<Employee>() {
{
//設(shè)置分割符
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() {
{
//設(shè)置字段
setNames(new String[] { "id", "firstName", "lastName" });
}
});
}
});
return writer;
}
2.3、Step
有了Reader-Processor-Writer后,就可以定義Step了:
@Bean
public Step csvStep() {
return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
.reader(multiResourceItemReader())
.processor(itemProcessor())
.writer(writer())
.build();
}
這里有一個(gè)chunk的設(shè)置,值為5,意思是5條記錄后再提交輸出,可以根據(jù)自己需求定義。
2.4、Job
完成了Step的編碼,定義Job就容易了:
@Bean
public Job pkslowCsvJob() {
return jobBuilderFactory
.get("pkslowCsvJob")
.incrementer(new RunIdIncrementer())
.start(csvStep())
.build();
}
2.5、運(yùn)行
完成以上編碼后,執(zhí)行程序,結(jié)果如下:

成功讀取數(shù)據(jù),并將最后字段轉(zhuǎn)為大寫(xiě),并輸出到outputData.csv文件。
三、監(jiān)聽(tīng)Listener
可以通過(guò)Listener接口對(duì)特定事件進(jìn)行監(jiān)聽(tīng),以實(shí)現(xiàn)更多業(yè)務(wù)功能。比如如果處理失敗,就記錄一條失敗日志;處理完成,就通知下游拿數(shù)據(jù)等。
我們分別對(duì)Read、Process和Write事件進(jìn)行監(jiān)聽(tīng),對(duì)應(yīng)分別要實(shí)現(xiàn)ItemReadListener接口、ItemProcessListener接口和ItemWriteListener接口。因?yàn)榇a比較簡(jiǎn)單,就是打印一下日志,這里只貼出ItemWriteListener的實(shí)現(xiàn)代碼:
public class PkslowWriteListener implements ItemWriteListener<Employee> {
private static final Log logger = LogFactory.getLog(PkslowWriteListener.class);
@Override
public void beforeWrite(List<? extends Employee> list) {
logger.info("beforeWrite: " + list);
}
@Override
public void afterWrite(List<? extends Employee> list) {
logger.info("afterWrite: " + list);
}
@Override
public void onWriteError(Exception e, List<? extends Employee> list) {
logger.info("onWriteError: " + list);
}
}
把實(shí)現(xiàn)的監(jiān)聽(tīng)器listener整合到Step中去:
@Bean
public Step csvStep() {
return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
.reader(multiResourceItemReader())
.listener(new PkslowReadListener())
.processor(itemProcessor())
.listener(new PkslowProcessListener())
.writer(writer())
.listener(new PkslowWriteListener())
.build();
}
執(zhí)行后看一下日志:

這里就能明顯看到之前設(shè)置的chunk的作用了。Writer每次是處理5條記錄,如果一條輸出一次,會(huì)對(duì)IO造成壓力。
以上就是詳解Spring Batch入門之優(yōu)秀的批處理框架的詳細(xì)內(nèi)容,更多關(guān)于Spring Batch 批處理框架的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- 源碼解析springbatch的job運(yùn)行機(jī)制
- spring中jdbcTemplate.batchUpdate的幾種使用情況
- spring?batch線上異常定位記錄
- Spring Batch 如何自定義ItemReader
- 手把手教你搭建第一個(gè)Spring Batch項(xiàng)目的步驟
- 基于Spring Batch向Elasticsearch批量導(dǎo)入數(shù)據(jù)示例
- Spring Batch入門教程篇
- Spring Batch讀取txt文件并寫(xiě)入數(shù)據(jù)庫(kù)的方法教程
- 使用Spring?Batch實(shí)現(xiàn)大數(shù)據(jù)處理的操作方法
相關(guān)文章
解決@Validated注解無(wú)效,嵌套對(duì)象屬性的@NotBlank無(wú)效問(wèn)題
這篇文章主要介紹了解決@Validated注解無(wú)效,嵌套對(duì)象屬性的@NotBlank無(wú)效問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10
Spring Boot 3.x 集成 Eureka Server/Cl
隨著SpringBoot 3.x版本的開(kāi)發(fā)嘗試,本文記錄了在集成Eureka Server/Client時(shí)所遇到的問(wèn)題和解決方案,文中詳細(xì)介紹了搭建服務(wù)、配置文件和測(cè)試步驟,感興趣的朋友跟隨小編一起看看吧2024-09-09
SpringBoot關(guān)于List集合的校驗(yàn)方式
這篇文章主要介紹了SpringBoot關(guān)于List集合的校驗(yàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07
在Java 8中將List轉(zhuǎn)換為Map對(duì)象方法
這篇文章主要介紹了在Java 8中將List轉(zhuǎn)換為Map對(duì)象方法,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-11-11
Java輕松實(shí)現(xiàn)在Excel中添加超鏈接功能
這篇文章主要為大家詳細(xì)介紹了Java如何輕松實(shí)現(xiàn)在Excel中添加超鏈接功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-01-01
IntelliJ IDEA中查看當(dāng)前類的所有繼承關(guān)系圖
今天小編就為大家分享一篇關(guān)于IntelliJ IDEA中查看當(dāng)前類的所有繼承關(guān)系圖,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-10-10
利用Java+MySQL實(shí)現(xiàn)附近功能實(shí)例
現(xiàn)在很多手機(jī)軟件都用附近搜索功能,但具體是怎么實(shí)現(xiàn)的呢?下面這篇文章就來(lái)給大家介紹關(guān)于利用Java+MySQL實(shí)現(xiàn)附近功能的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來(lái)一起看看吧。2017-12-12

