Spring Batch 如何自定義ItemReader
Spring Batch 自定義ItemReader
Spring Batch支持各種數(shù)據(jù)輸入源,如文件、數(shù)據(jù)庫等。然而有時也會遇到一些默認不支持的數(shù)據(jù)源,這時我們則需要實現(xiàn)自己的數(shù)據(jù)源————自定義ItemReader。本文通過示例說明如何自定義ItemReader。
創(chuàng)建自定義ItemReader
創(chuàng)建自定義ItemReader需要下面兩個步驟:
- 創(chuàng)建一個實現(xiàn)ItemReader接口的類,并提供返回對象類型 T 作為類型參數(shù)。
- 按照下面規(guī)則實現(xiàn)ItemReader接口的T read()方法
read()方法如果存在下一個對象則返回,否則返回null。
下面我們自定義ItemReader,其返回在線測試課程的學(xué)生信息StuDto類型,為了減少復(fù)雜性,該數(shù)據(jù)存儲在內(nèi)存中。StuDto類是一個簡單數(shù)據(jù)傳輸對象,代碼如下:
@Data
public class StuDTO {
private String emailAddress;
private String name;
private String purchasedPackage;
}
下面參照一下步驟創(chuàng)建ItemReader:
- 創(chuàng)建InMemoryStudentReader 類
- 實現(xiàn)ItemReader接口,并設(shè)置返回對象類型為StuDto
- 類中增加List studentData 字段,其包括參加課程的學(xué)生信息
- 類中增加nextStudentIndex 字段,表示下一個StuDto對象的索引
- 增加私有initialize()方法,初始化學(xué)生信息并設(shè)置索引值為0
- 創(chuàng)建構(gòu)造函數(shù)并調(diào)用initialize方法
- 實現(xiàn)read()方法,包括下面規(guī)則:如果存在下一個學(xué)生,則返回StuDto對象并把索引加一。否則返回null。
InMemoryStudentReader 代碼如下:
public class InMemoryStudentReader implements ItemReader<StuDto> {
private int nextStudentIndex;
private List<StuDto> studentData;
InMemoryStudentReader() {
initialize();
}
private void initialize() {
StuDto tony = new StuDto();
tony.setEmailAddress("tony.tester@gmail.com");
tony.setName("Tony Tester");
tony.setPurchasedPackage("master");
StuDto nick = new StuDto();
nick.setEmailAddress("nick.newbie@gmail.com");
nick.setName("Nick Newbie");
nick.setPurchasedPackage("starter");
StuDto ian = new StuDto();
ian.setEmailAddress("ian.intermediate@gmail.com");
ian.setName("Ian Intermediate");
ian.setPurchasedPackage("intermediate");
studentData = Collections.unmodifiableList(Arrays.asList(tony, nick, ian));
nextStudentIndex = 0;
}
@Override
public StuDto read() throws Exception {
StuDto nextStudent = null;
if (nextStudentIndex < studentData.size()) {
nextStudent = studentData.get(nextStudentIndex);
nextStudentIndex++;
}
return nextStudent;
}
}
創(chuàng)建好自定義ItemReader后,需要配置其作為bean讓Spring Batch Job使用。下面請看如何配置。
配置ItemReader Bean
配置類代碼如下:
@Configuration
public class InMemoryStudentJobConfig {
@Bean
ItemReader<StuDto> inMemoryStudentReader() {
return new InMemoryStudentReader();
}
}
需要增加@Configuration表明類為配置類, 增加方法返回ItemReader類型,并增加@Bean注解,實現(xiàn)方法內(nèi)容————返回InMemoryStudentReader對象。
小結(jié)一下
本文通過示例說明如何自定義ItemReader,主要包括三個方面:
- 自定義ItemReader需實現(xiàn)ItemReader接口
- 實現(xiàn)ItemReader接口,需要指定返回類型作為類型參數(shù)(T)
- 實現(xiàn)接口方法read,如果存在下一個對象則返回,反之返回null
Spring Batch 之 ItemReader
重點介紹 ItemReader,如何從不同數(shù)據(jù)源讀取數(shù)據(jù);以及異常處理及重啟機制。
JdbcPagingItemReader
從數(shù)據(jù)庫中讀取數(shù)據(jù)
@Configuration
public class DBJdbcDemoJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("dbJdbcDemoWriter")
private ItemWriter<? super Customer> dbJdbcDemoWriter;
@Autowired
private DataSource dataSource;
@Bean
public Job DBJdbcDemoJob(){
return jobBuilderFactory.get("DBJdbcDemoJob")
.start(dbJdbcDemoStep())
.build();
}
@Bean
public Step dbJdbcDemoStep() {
return stepBuilderFactory.get("dbJdbcDemoStep")
.<Customer,Customer>chunk(100)
.reader(dbJdbcDemoReader())
.writer(dbJdbcDemoWriter)
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> dbJdbcDemoReader() {
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(100); //批量讀取
reader.setRowMapper((rs,rowNum)->{
return Customer.builder().id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate"))
.build();
});
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from Customer");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
}
Job 和 ItermWriter不是本文介紹重點,此處舉例,下面例子相同
@Component("dbJdbcDemoWriter")
public class DbJdbcDemoWriter implements ItemWriter<Customer> {
@Override
public void write(List<? extends Customer> items) throws Exception {
for (Customer customer:items)
System.out.println(customer);
}
}
FlatFileItemReader
從CVS文件中讀取數(shù)據(jù)
@Configuration
public class FlatFileDemoJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("flatFileDemoWriter")
private ItemWriter<? super Customer> flatFileDemoWriter;
@Bean
public Job flatFileDemoJob(){
return jobBuilderFactory.get("flatFileDemoJob")
.start(flatFileDemoStep())
.build();
}
@Bean
public Step flatFileDemoStep() {
return stepBuilderFactory.get("flatFileDemoStep")
.<Customer,Customer>chunk(100)
.reader(flatFileDemoReader())
.writer(flatFileDemoWriter)
.build();
}
@Bean
@StepScope
public FlatFileItemReader<Customer> flatFileDemoReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("customer.csv"));
reader.setLinesToSkip(1);
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"});
DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper((fieldSet -> {
return Customer.builder().id(fieldSet.readLong("id"))
.firstName(fieldSet.readString("firstName"))
.lastName(fieldSet.readString("lastName"))
.birthdate(fieldSet.readString("birthdate"))
.build();
}));
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
return reader;
}
}
StaxEventItemReader
從XML文件中讀取數(shù)據(jù)
@Configuration
public class XmlFileDemoJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("xmlFileDemoWriter")
private ItemWriter<? super Customer> xmlFileDemoWriter;
@Bean
public Job xmlFileDemoJob(){
return jobBuilderFactory.get("xmlFileDemoJob")
.start(xmlFileDemoStep())
.build();
}
@Bean
public Step xmlFileDemoStep() {
return stepBuilderFactory.get("xmlFileDemoStep")
.<Customer,Customer>chunk(10)
.reader(xmlFileDemoReader())
.writer(xmlFileDemoWriter)
.build();
}
@Bean
@StepScope
public StaxEventItemReader<Customer> xmlFileDemoReader() {
StaxEventItemReader<Customer> reader = new StaxEventItemReader<>();
reader.setResource(new ClassPathResource("customer.xml"));
reader.setFragmentRootElementName("customer");
XStreamMarshaller unMarshaller = new XStreamMarshaller();
Map<String,Class> map = new HashMap<>();
map.put("customer",Customer.class);
unMarshaller.setAliases(map);
reader.setUnmarshaller(unMarshaller);
return reader;
}
}
MultiResourceItemReader
從多個文件讀取數(shù)據(jù)
@Configuration
public class MultipleFileDemoJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("flatFileDemoWriter")
private ItemWriter<? super Customer> flatFileDemoWriter;
@Value("classpath*:/file*.csv")
private Resource[] inputFiles;
@Bean
public Job multipleFileDemoJob(){
return jobBuilderFactory.get("multipleFileDemoJob")
.start(multipleFileDemoStep())
.build();
}
@Bean
public Step multipleFileDemoStep() {
return stepBuilderFactory.get("multipleFileDemoStep")
.<Customer,Customer>chunk(50)
.reader(multipleResourceItemReader())
.writer(flatFileDemoWriter)
.build();
}
private MultiResourceItemReader<Customer> multipleResourceItemReader() {
MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();
reader.setDelegate(flatFileReader());
reader.setResources(inputFiles);
return reader;
}
@Bean
public FlatFileItemReader<Customer> flatFileReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("customer.csv"));
// reader.setLinesToSkip(1);
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"});
DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper((fieldSet -> {
return Customer.builder().id(fieldSet.readLong("id"))
.firstName(fieldSet.readString("firstName"))
.lastName(fieldSet.readString("lastName"))
.birthdate(fieldSet.readString("birthdate"))
.build();
}));
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
return reader;
}
}
異常處理及重啟機制
對于chunk-oriented step,Spring Batch提供了管理狀態(tài)的工具。如何在一個步驟中管理狀態(tài)是通過ItemStream接口為開發(fā)人員提供訪問權(quán)限保持狀態(tài)的組件。這里提到的這個組件是ExecutionContext實際上它是鍵值對的映射。map存儲特定步驟的狀態(tài)。該ExecutionContext使重啟步驟成為可能,因為狀態(tài)在JobRepository中持久存在。
執(zhí)行期間出現(xiàn)錯誤時,最后一個狀態(tài)將更新為JobRepository。下次作業(yè)運行時,最后一個狀態(tài)將用于填充ExecutionContext然后
可以繼續(xù)從上次離開的地方開始運行。
檢查ItemStream接口:
將在步驟開始時調(diào)用open()并執(zhí)行ExecutionContext;
用DB填充值; update()將在每個步驟或事務(wù)結(jié)束時調(diào)用,更新ExecutionContext;
完成所有數(shù)據(jù)塊后調(diào)用close();

下面我們構(gòu)造個例子
準(zhǔn)備個cvs文件,在第33條數(shù)據(jù),添加一條錯誤名字信息 ;當(dāng)讀取到這條數(shù)據(jù)時,拋出異常終止程序。

ItemReader測試代碼
@Component("restartDemoReader")
public class RestartDemoReader implements ItemStreamReader<Customer> {
private Long curLine = 0L;
private boolean restart = false;
private FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
private ExecutionContext executionContext;
RestartDemoReader
public () {
reader.setResource(new ClassPathResource("restartDemo.csv"));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthdate"});
DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper((fieldSet -> {
return Customer.builder().id(fieldSet.readLong("id"))
.firstName(fieldSet.readString("firstName"))
.lastName(fieldSet.readString("lastName"))
.birthdate(fieldSet.readString("birthdate"))
.build();
}));
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
}
@Override
public Customer read() throws Exception, UnexpectedInputException, ParseException,
NonTransientResourceException {
Customer customer = null;
this.curLine++;
//如果是重啟,則從上一步讀取的行數(shù)繼續(xù)往下執(zhí)行
if (restart) {
reader.setLinesToSkip(this.curLine.intValue()-1);
restart = false;
System.out.println("Start reading from line: " + this.curLine);
}
reader.open(this.executionContext);
customer = reader.read();
//當(dāng)匹配到wrongName時,顯示拋出異常,終止程序
if (customer != null) {
if (customer.getFirstName().equals("wrongName"))
throw new RuntimeException("Something wrong. Customer id: " + customer.getId());
} else {
curLine--;
}
return customer;
}
/**
* 判斷是否是重啟job
* @param executionContext
* @throws ItemStreamException
*/
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.executionContext = executionContext;
if (executionContext.containsKey("curLine")) {
this.curLine = executionContext.getLong("curLine");
this.restart = true;
} else {
this.curLine = 0L;
executionContext.put("curLine", this.curLine.intValue());
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("update curLine: " + this.curLine);
executionContext.put("curLine", this.curLine);
}
@Override
public void close() throws ItemStreamException {
}
}
Job配置
以10條記錄為一個批次,進行讀取
@Configuration
public class RestartDemoJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("flatFileDemoWriter")
private ItemWriter<? super Customer> flatFileDemoWriter;
@Autowired
@Qualifier("restartDemoReader")
private ItemReader<Customer> restartDemoReader;
@Bean
public Job restartDemoJob(){
return jobBuilderFactory.get("restartDemoJob")
.start(restartDemoStep())
.build();
}
@Bean
public Step restartDemoStep() {
return stepBuilderFactory.get("restartDemoStep")
.<Customer,Customer>chunk(10)
.reader(restartDemoReader)
.writer(flatFileDemoWriter)
.build();
}
}
當(dāng)我們第一次執(zhí)行時,程序在33行拋出異常異常,curline值是30;

這時,我們可以查詢數(shù)據(jù)庫 batch_step_excution表,發(fā)現(xiàn)curline值已經(jīng)以 鍵值對形式,持久化進數(shù)據(jù)庫(上文以10條數(shù)據(jù)為一個批次;故33條數(shù)據(jù)異常時,curline值為30)

接下來,我們更新wrongName,再次執(zhí)行程序;
程序會執(zhí)行open方法,判斷數(shù)據(jù)庫step中map是否存在curline,如果存在,則是重跑,即讀取curline,從該批次開始往下繼續(xù)執(zhí)行;


以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java將網(wǎng)絡(luò)圖片轉(zhuǎn)成輸入流以及將url轉(zhuǎn)成InputStream問題
這篇文章主要介紹了Java將網(wǎng)絡(luò)圖片轉(zhuǎn)成輸入流以及將url轉(zhuǎn)成InputStream問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01
Java?Socket實現(xiàn)文件發(fā)送和接收功能以及遇到的Bug問題
這篇文章主要介紹了Java?Socket實現(xiàn)文件發(fā)送和接收功能以及遇到的Bug問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08
java使用spring實現(xiàn)發(fā)送mail的方法
這篇文章主要介紹了java使用spring實現(xiàn)發(fā)送mail的方法,涉及java基于spring框架發(fā)送郵件的相關(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-10-10
mybatis中實現(xiàn)枚舉自動轉(zhuǎn)換方法詳解
在使用mybatis的時候經(jīng)常會遇到枚舉類型的轉(zhuǎn)換,下面這篇文章主要給大家介紹了關(guān)于mybatis中實現(xiàn)枚舉自動轉(zhuǎn)換的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起看看吧。2017-08-08
Java基于socket服務(wù)實現(xiàn)UDP協(xié)議的方法
這篇文章主要介紹了Java基于socket服務(wù)實現(xiàn)UDP協(xié)議的方法,通過兩個簡單實例分析了java通過socket實現(xiàn)UDP發(fā)送與接收的技巧,需要的朋友可以參考下2015-05-05

