Spring Batch讀取txt文件并寫入數(shù)據(jù)庫的方法教程
項(xiàng)目需求
近日需要實(shí)現(xiàn)用戶推薦相關(guān)的功能,也就是說向用戶推薦他可能喜歡的東西。
我們的數(shù)據(jù)分析工程師會(huì)將用戶以及用戶可能喜歡的東西整理成文檔給我,我只需要將數(shù)據(jù)從文檔中讀取出來,然后對(duì)數(shù)據(jù)進(jìn)行進(jìn)一步的清洗(例如去掉特殊符號(hào),長度如果太長則截?。?。然后將處理后的數(shù)據(jù)存入數(shù)據(jù)庫(Mysql)。
所以分為三步:
- 讀取文檔獲得數(shù)據(jù)
- 對(duì)獲得的數(shù)據(jù)進(jìn)行處理
- 更新數(shù)據(jù)庫(新增或更新)
考慮到這個(gè)數(shù)據(jù)量以后會(huì)越來越大,這里沒有使用 poi 來讀取數(shù)據(jù),而直接使用了 SpringBatch。
實(shí)現(xiàn)步驟
本文假設(shè)讀者已經(jīng)能夠使用 SpringBoot 連接處理 Mysql,所以這部分文中會(huì)省略。
1、創(chuàng)建 Maven 項(xiàng)目,并在 pom.xml 中添加依賴
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.2.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.2.0</version> </dependency> <!-- 工具類依賴--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.12.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <!-- 數(shù)據(jù)庫相關(guān)依賴 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.26</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
這里是這個(gè)小項(xiàng)目中用到的所有依賴,包括連接數(shù)據(jù)庫的依賴以及工具類等。
2、編寫 Model 類
我們要從文檔中讀取的有效列就是 uid,tag,type,就是用戶 ID,用戶可能包含的標(biāo)簽(用于推送),用戶類別(用戶用戶之間互相推薦)。
UserMap.java 中的 @Entity,@Column 注解,是為了利用 JPA 生成數(shù)據(jù)表而寫的,可要可不要。
UserMap.java
@Data
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
//@Entity(name = "user_map")
public class UserMap extends BaseModel {
@Column(name = "uid", unique = true, nullable = false)
private Long uid;
@Column(name = "tag")
private String tag;
@Column(name = "type")
private Integer type;
}
3、實(shí)現(xiàn)批處理配置類
BatchConfiguration.java
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("prodDataSource")
DataSource prodDataSource;
@Bean
public FlatFileItemReader<UserMap> reader() {
FlatFileItemReader<UserMap> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("c152.txt"));
reader.setLineMapper(new DefaultLineMapper<UserMap>() {{
setLineTokenizer(new DelimitedLineTokenizer("|") {{
setNames(new String[]{"uid", "tag", "type"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<UserMap>() {{
setTargetType(UserMap.class);
}});
}});
return reader;
}
@Bean
public JdbcBatchItemWriter<UserMap> importWriter() {
JdbcBatchItemWriter<UserMap> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("INSERT INTO user_map (uid,tag,type) VALUES (:uid, :tag,:type)");
writer.setDataSource(prodDataSource);
return writer;
}
@Bean
public JdbcBatchItemWriter<UserMap> updateWriter() {
JdbcBatchItemWriter<UserMap> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("UPDATE user_map SET type = (:type),tag = (:tag) WHERE uid = (:uid)");
writer.setDataSource(prodDataSource);
return writer;
}
@Bean
public UserMapItemProcessor processor(UserMapItemProcessor.ProcessStatus processStatus) {
return new UserMapItemProcessor(processStatus);
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(importStep())
.end()
.build();
}
@Bean
public Step importStep() {
return stepBuilderFactory.get("importStep")
.<UserMap, UserMap>chunk(100)
.reader(reader())
.processor(processor(IMPORT))
.writer(importWriter())
.build();
}
@Bean
public Job updateUserJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory.get("updateUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(updateStep())
.end()
.build();
}
@Bean
public Step updateStep() {
return stepBuilderFactory.get("updateStep")
.<UserMap, UserMap>chunk(100)
.reader(reader())
.processor(processor(UPDATE))
.writer(updateWriter())
.build();
}
}
prodDataSource 是假設(shè)用戶已經(jīng)設(shè)置好的,如果不知道怎么配置,也可以參考之前的文章進(jìn)行配置:Springboot 集成 Mybatis。
reader(),這方法從文件中讀取數(shù)據(jù),并且設(shè)置了一些必要的參數(shù)。緊接著是寫操作 importWriter() 和 updateWriter() ,讀者看其中一個(gè)就好,因?yàn)槲疫@里是需要更新或者修改的,所以分為兩個(gè)。
processor(ProcessStatus status) ,該方法是對(duì)我們處理數(shù)據(jù)的類進(jìn)行實(shí)例化,這里我根據(jù) status 是 IMPORT 還是 UPDATE 來獲取不同的處理結(jié)果。
其他的看代碼就可以看懂了,哈哈,不詳細(xì)說了。
4、將獲得的數(shù)據(jù)進(jìn)行清洗
UserMapItemProcessor.java
public class UserMapItemProcessor implements ItemProcessor<UserMap, UserMap> {
private static final int MAX_TAG_LENGTH = 200;
private ProcessStatus processStatus;
public UserMapItemProcessor(ProcessStatus processStatus) {
this.processStatus = processStatus;
}
@Autowired
IUserMapService userMapService;
private static final String TAG_PATTERN_STR = "^[a-zA-Z0-9\\u4E00-\\u9FA5_-]+$";
public static final Pattern TAG_PATTERN = Pattern.compile(TAG_PATTERN_STR);
private static final Logger LOG = LoggerFactory.getLogger(UserMapItemProcessor.class);
@Override
public UserMap process(UserMap userMap) throws Exception {
Long uid = userMap.getUid();
String tag = cleanTag(userMap.getTag());
Integer label = userMap.getType() == null ? Integer.valueOf(0) : userMap.getType();
if (StringUtils.isNotBlank(tag)) {
Map<String, Object> params = new HashMap<>();
params.put("uid", uid);
UserMap userMapFromDB = userMapService.selectOne(params);
if (userMapFromDB == null) {
if (this.processStatus == ProcessStatus.IMPORT) {
return new UserMap(uid, tag, label);
}
} else {
if (this.processStatus == ProcessStatus.UPDATE) {
if (!tag.equals(userMapFromDB.getTag()) && !label.equals(userMapFromDB.getType())) {
userMapFromDB.setType(label);
userMapFromDB.setTag(tag);
return userMapFromDB;
}
}
}
}
return null;
}
/**
* 清洗標(biāo)簽
*
* @param tag
* @return
*/
private static String cleanTag(String tag) {
if (StringUtils.isNotBlank(tag)) {
try {
tag = tag.substring(tag.indexOf("{") + 1, tag.lastIndexOf("}"));
String[] tagArray = tag.split(",");
Optional<String> reduce = Arrays.stream(tagArray).parallel()
.map(str -> str.split(":")[0])
.map(str -> str.replaceAll("\'", ""))
.map(str -> str.replaceAll(" ", ""))
.filter(str -> TAG_PATTERN.matcher(str).matches())
.reduce((x, y) -> x + "," + y);
Function<String, String> str = (s -> s.length() > MAX_TAG_LENGTH ? s.substring(0, MAX_TAG_LENGTH) : s);
return str.apply(reduce.get());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
return null;
}
protected enum ProcessStatus {
IMPORT,
UPDATE;
}
public static void main(String[] args) {
String distinctTag = cleanTag("Counter({'《重新定義》': 3, '輕想上的輕小說': 3, '小說': 2, 'Fate': 2, '同人小說': 2, '雪狼八組': 1, " +
"'社會(huì)': 1, '人文': 1, '短篇': 1, '重新定義': 1, 'AMV': 1, '《FBD》': 1, '《雪狼六組》': 1, '戰(zhàn)爭': 1, '《灰羽聯(lián)盟》': 1, " +
"'誰說輕想沒人寫小說': 1})");
System.out.println(distinctTag);
}
}
讀取到的數(shù)據(jù)格式如 main() 方法所示,清理之后的結(jié)果如:
輕想上的輕小說,小說,Fate,同人小說,雪狼八組,社會(huì),人文,短篇,重新定義,AMV,戰(zhàn)爭,誰說輕想沒人寫小說 。
去掉了特殊符號(hào)以及數(shù)字等。使用了 Java8 的 Lambda 表達(dá)式。
并且這里在處理的時(shí)候,判斷如果該數(shù)據(jù)用戶已經(jīng)存在,則進(jìn)行更新,如果不存在,則新增。
5、Job 執(zhí)行結(jié)束回調(diào)類
JobCompletionNotificationListener.java
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
private final JdbcTemplate jdbcTemplate;
@Autowired
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("end .....");
}
}
具體的邏輯可自行實(shí)現(xiàn)。
完成以上幾個(gè)步驟,運(yùn)行項(xiàng)目,就可以讀取并寫入數(shù)據(jù)到數(shù)據(jù)庫了。

總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對(duì)大家學(xué)習(xí)或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
Java實(shí)現(xiàn)無損Word轉(zhuǎn)PDF的示例代碼
本文將利用Java中的兩個(gè)jar包:pdfbox和aspose-words實(shí)現(xiàn)無損Word轉(zhuǎn)PDF功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以動(dòng)手嘗試一下2022-06-06
java計(jì)算日期相差天數(shù)的4種簡單方法舉例
最近在工作中遇見一個(gè)小需求,要求計(jì)算兩個(gè)日期之間相差幾天,下面這篇文章主要給大家介紹了關(guān)于java計(jì)算日期相差天數(shù)的4種簡單方法,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-06-06
Opencv實(shí)現(xiàn)身份證OCR識(shí)別的示例詳解
這篇文章主要為大家詳細(xì)介紹了如何使用Opencv實(shí)現(xiàn)身份證OCR識(shí)別功能,文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價(jià)值,感興趣的小伙伴可以跟隨小編一起了解一下2024-03-03
java拷貝指定目錄下所有內(nèi)容到minIO代碼實(shí)例
這篇文章主要介紹了java拷貝指定目錄下所有內(nèi)容到minIO代碼實(shí)例,創(chuàng)建桶 直接使用工具類先判斷,再創(chuàng)建即可,創(chuàng)建文件夾,需要注意以"/"結(jié)尾,實(shí)際也是在minIO上創(chuàng)建文件,只是作為目錄的表現(xiàn)形式展示,需要的朋友可以參考下2024-01-01
移動(dòng)開發(fā)Spring Boot外置tomcat教程及解決方法
這篇文章主要介紹了移動(dòng)開發(fā)SpringBoot外置tomcat教程,需要的朋友可以參考下2017-11-11
詳談fastjson將對(duì)象格式化成json時(shí)的兩個(gè)問題
下面小編就為大家?guī)硪黄斦刦astjson將對(duì)象格式化成json時(shí)的兩個(gè)問題。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-05-05
Java實(shí)現(xiàn)threadLocal線程池獲取
本文主要介紹了Java實(shí)現(xiàn)threadLocal線程池獲取,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07

