Springboot集成SpringBatch批處理組件
1.Spring Batch 簡(jiǎn)介
Spring Batch 是 Spring 生態(tài)系統(tǒng)中的??企業(yè)級(jí)批處理框架??,專門(mén)設(shè)計(jì)用于處理大規(guī)模數(shù)據(jù)作業(yè)。它提供了批處理應(yīng)用所需的核心功能,解決了傳統(tǒng)批處理應(yīng)用開(kāi)發(fā)中的重復(fù)性問(wèn)題,使開(kāi)發(fā)人員能夠?qū)W⒂跇I(yè)務(wù)邏輯而非基礎(chǔ)設(shè)施。
核心價(jià)值與定位??
??問(wèn)題解決??:自動(dòng)化處理??周期性的、數(shù)據(jù)密集型的??任務(wù)(如報(bào)表生成、數(shù)據(jù)遷移、對(duì)賬結(jié)算)
??典型場(chǎng)景??:
每月財(cái)務(wù)報(bào)表生成
銀行日終批量交易處理
電商平臺(tái)每日用戶行為分析
百萬(wàn)級(jí)數(shù)據(jù)遷移(如舊系統(tǒng)到新系統(tǒng))
2.批處理工具架構(gòu)和示例
| 項(xiàng) | 接口 |
|---|---|
| 讀 | ItemReader |
| 處理 | ItemProcessor |
| 寫(xiě) | ItemWriter |
項(xiàng)目結(jié)構(gòu)

依賴包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SpringBatcher</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-boot.version>3.5.3</spring-boot.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>4.0.3</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope> <!-- 通常只需運(yùn)行時(shí)依賴 -->
</dependency>
</dependencies>
</project>
啟動(dòng)類
package org.example;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableBatchProcessing
@SpringBootApplication
public class BatchApp {
public static void main(String[] args) {
SpringApplication.run(BatchApp.class, args);
}
}
2.1 批處理任務(wù)持久化控制
示例代碼基于 H2 存儲(chǔ)
package org.example.config;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
/**
* @Author zhx && moon
* @Since 21
* @Date 2025-06-20 PM 4:21
*/
@Configuration
public class BatchJobConfig {
@Bean
public JobRepository jobRepository(DataSource dataSource) throws Exception {
JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean();
bean.setDataSource(dataSource);
bean.setDatabaseType("H2");
bean.setTransactionManager(new DataSourceTransactionManager(dataSource));
bean.afterPropertiesSet();
return bean.getObject();
}
}
2.2 實(shí)現(xiàn)一個(gè)讀取器
以 Excel 文件讀取為例
package org.example.job.common;
import com.alibaba.excel.EasyExcel;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.InitializingBean;
import java.io.File;
import java.util.List;
/**
* @Author zhx && moon
* @Since 21
* @Date 2025-06-24 PM 2:16
*/
public class EasyExcelItemReader<T> implements ItemReader<T>, InitializingBean {
private final Class<T> clazz;
private final String filePath;
private List<T> cacheList;
private int index = 0;
public EasyExcelItemReader(Class<T> clazz, String filePath) {
this.clazz = clazz;
this.filePath = filePath;
}
@Override
public void afterPropertiesSet() {
try {
// 一次性讀取Excel所有數(shù)據(jù)(適用于中小文件)
cacheList = EasyExcel.read(new File(filePath))
.head(clazz)
.sheet()
.headRowNumber(1) // 跳過(guò)標(biāo)題行
.doReadSync();
} catch (Exception e) {
throw new RuntimeException("read excel failed ", e);
}
}
@Override
public T read() {
if (index < cacheList.size()) {
return cacheList.get(index++);
}
// 重置讀取的位置
index = 0;
return null;
}
}
2.3 定義批處理JOB
package org.example.job;
import org.example.entity.User;
import org.example.job.common.EasyExcelItemReader;
import org.springframework.batch.core.*;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
/**
* @Author zhx && moon
* @Since 21
* @Date 2025-06-24 PM 2:08
*/
@Component
public class SVCJob {
/**
* Excel 讀取
* @return
*/
@Bean("easyExcelItemReader")
public EasyExcelItemReader<User> easyExcelItemReader() {
return new EasyExcelItemReader<>(User.class, "C:\\Users\\Administrator\\Desktop\\Test.xlsx");
}
/**
* 數(shù)據(jù)處理器 對(duì)讀取的數(shù)據(jù)進(jìn)行加工
* @return
*/
@Bean("getNameProcessors")
public ItemProcessor<User, String> getNameProcessors() {
return item -> {
return item.getName();
};
}
/**
* 配置寫(xiě)入器(保持不變)
* @return
*/
@Bean("nameWriter")
public ItemWriter<String> nameWriter() {
return items -> {
for (String item : items) {
System.out.println("User Name: " + item);
}
};
}
/**
* 配置批處理步驟(使用新版API)
* @param jobRepository
* @param transactionManager
* @param reader
* @param processor
* @param writer
* @return
*/
@Bean("easyExcelStep")
public Step easyExcelStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
@Qualifier("easyExcelItemReader") EasyExcelItemReader<User> reader,
@Qualifier("getNameProcessors") ItemProcessor<User, String> processor,
@Qualifier("nameWriter") ItemWriter<String> writer) {
return new StepBuilder("easyExcelStep", jobRepository)
.<User, String>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipLimit(1)
.skip(IllegalArgumentException.class)
.listener(new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("start to processor data ...");
}
})
.build();
}
/**
* 配置批處理作業(yè)
* @param jobRepository
* @param importStep
* @return
*/
@Bean("easyExcelImportJobs")
public Job customerImportJob(JobRepository jobRepository, @Qualifier("easyExcelStep") Step importStep) {
return new JobBuilder("easyExcelImportJobs", jobRepository)
.incrementer(new RunIdIncrementer())
.start(importStep)
.listener(new JobExecutionListener() {
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("Job Finished!State: " + jobExecution.getStatus());
}
})
.build();
}
}
2.4數(shù)據(jù)實(shí)體
package org.example.entity;
import com.alibaba.excel.annotation.ExcelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @Author zhx && moon
* @Since 21
* @Date 2025-06-24 PM 2:20
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
@ExcelProperty("姓名")
private String name;
@ExcelProperty("編號(hào)")
private String employeeId;
@ExcelProperty("年齡")
private Integer age;
}
2.5 接口類
package org.example.controller;
import jakarta.annotation.Resource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author zhx && moon
* @Since 21
* @Date 2025-06-23 PM 4:40
*/
@RestController
@RequestMapping("/job")
public class JobManage {
@Autowired
private JobLauncher jobLauncher;
@Resource(name = "easyExcelImportJobs")
Job job;
@GetMapping("/start")
public void start(){
try {
JobParameters params = new JobParametersBuilder()
.addLong("uniqueId", System.nanoTime())
.toJobParameters();
jobLauncher.run(job, params);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
2.6 H2 配置
spring:
datasource:
url: jdbc:h2:file:Z:/IdeaProjects/SpringBatcher/SpringBatcher/springbatchdb #jdbc:h2:tcp://localhost/mem:springbatchdb;DB_CLOSE_DELAY=-1 #jdbc:h2:mem:springbatchdb
driver-class-name: org.h2.Driver
username: sa
password: sa
h2:
console:
enabled: true
path: /h2/db-console
settings:
web-allow-others: true
batch:
jdbc:
initialize-schema: always
2.7 H2 數(shù)據(jù)庫(kù)腳本
-- Autogenerated: do not edit this file CREATE TABLE BATCH_JOB_INSTANCE ( JOB_INSTANCE_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY , VERSION BIGINT , JOB_NAME VARCHAR(100) NOT NULL, JOB_KEY VARCHAR(32) NOT NULL, constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY) ) ; CREATE TABLE BATCH_JOB_EXECUTION ( JOB_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY , VERSION BIGINT , JOB_INSTANCE_ID BIGINT NOT NULL, CREATE_TIME TIMESTAMP(9) NOT NULL, START_TIME TIMESTAMP(9) DEFAULT NULL , END_TIME TIMESTAMP(9) DEFAULT NULL , STATUS VARCHAR(10) , EXIT_CODE VARCHAR(2500) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED TIMESTAMP(9), constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID) references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) ) ; CREATE TABLE BATCH_JOB_EXECUTION_PARAMS ( JOB_EXECUTION_ID BIGINT NOT NULL , PARAMETER_NAME VARCHAR(100) NOT NULL , PARAMETER_TYPE VARCHAR(100) NOT NULL , PARAMETER_VALUE VARCHAR(2500) , IDENTIFYING CHAR(1) NOT NULL , constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; CREATE TABLE BATCH_STEP_EXECUTION ( STEP_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY , VERSION BIGINT NOT NULL, STEP_NAME VARCHAR(100) NOT NULL, JOB_EXECUTION_ID BIGINT NOT NULL, CREATE_TIME TIMESTAMP(9) NOT NULL, START_TIME TIMESTAMP(9) DEFAULT NULL , END_TIME TIMESTAMP(9) DEFAULT NULL , STATUS VARCHAR(10) , COMMIT_COUNT BIGINT , READ_COUNT BIGINT , FILTER_COUNT BIGINT , WRITE_COUNT BIGINT , READ_SKIP_COUNT BIGINT , WRITE_SKIP_COUNT BIGINT , PROCESS_SKIP_COUNT BIGINT , ROLLBACK_COUNT BIGINT , EXIT_CODE VARCHAR(2500) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED TIMESTAMP(9), constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT ( STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT LONGVARCHAR , constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID) references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID) ) ; CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT ( JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT LONGVARCHAR , constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ; CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ; CREATE SEQUENCE BATCH_JOB_SEQ;
3.測(cè)試
啟動(dòng)服務(wù)

測(cè)試 H2 連接

測(cè)試數(shù)據(jù)

觸發(fā) JOB

JOB 執(zhí)行記錄

到此這篇關(guān)于Springboot集成SpringBatch批處理組件的文章就介紹到這了,更多相關(guān)SpringBatch批處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot啟動(dòng)mongoDB報(bào)錯(cuò)之禁用mongoDB自動(dòng)配置問(wèn)題
這篇文章主要介紹了springboot啟動(dòng)mongoDB報(bào)錯(cuò)之禁用mongoDB自動(dòng)配置問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05
SpringBoot實(shí)現(xiàn)監(jiān)控Actuator,關(guān)閉redis監(jiān)測(cè)
這篇文章主要介紹了SpringBoot實(shí)現(xiàn)監(jiān)控Actuator,關(guān)閉redis監(jiān)測(cè),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11
兼容Spring Boot 1.x和2.x配置類參數(shù)綁定的工具類SpringBootBindUtil
今天小編就為大家分享一篇關(guān)于兼容Spring Boot 1.x和2.x配置類參數(shù)綁定的工具類SpringBootBindUtil,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-12-12
SpringMVC的處理器適配器-HandlerAdapter的用法及說(shuō)明
這篇文章主要介紹了SpringMVC的處理器適配器-HandlerAdapter的用法及說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12
RocketMQ4.5.X 實(shí)現(xiàn)修改生產(chǎn)者消費(fèi)者日志保存路徑
這篇文章主要介紹了RocketMQ4.5.X 實(shí)現(xiàn)修改生產(chǎn)者消費(fèi)者日志保存路徑方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07
java重寫(xiě)(@Override)介紹以及舉例說(shuō)明
這篇文章主要給大家介紹了關(guān)于java重寫(xiě)(@Override)介紹以及舉例說(shuō)明的相關(guān)資料,在Java中@Override注解用于表示方法重寫(xiě)(覆蓋)了父類的方法,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-01-01

