spring?boot使用@Async注解解決異步多線程入庫的問題
前言
在開發(fā)過程中,我們會遇到很多使用線程池的業(yè)務(wù)場景,例如定時任務(wù)使用的就是ScheduledThreadPoolExecutor。而有些時候使用線程池的場景就是會將一些可以進行異步操作的業(yè)務(wù)放在線程池中去完成,例如在生成訂單的時候給用戶發(fā)送短信,生成訂單的結(jié)果不應(yīng)該被發(fā)送短信的成功與否所左右,也就是說生成訂單這個主操作是不依賴于發(fā)送短信這個操作,所以我們就可以把發(fā)送短信這個操作置為異步操作。而要想完成異步操作,一般使用的一個是消息服務(wù)器MQ,一個就是線程池。今天我們就來看看在Java中常用的Spring框架中如何去使用線程池來完成異步操作,以及分析背后的原理。
在Spring4中,Spring中引入了一個新的注解@Async,這個注解讓我們在使用Spring完成異步操作變得非常方便。
在SpringBoot環(huán)境中,要使用@Async注解,我們需要先在啟動類上加上@EnableAsync注解。這個與在SpringBoot中使用@Scheduled注解需要在啟動類中加上@EnableScheduling是一樣的道理(當(dāng)然你使用古老的XML配置也是可以的,但是在SpringBoot環(huán)境中,建議的是全注解開發(fā)),具體原理下面會分析。加上@EnableAsync注解后,如果我們想在調(diào)用一個方法的時候開啟一個新的線程開始異步操作,我們只需要在這個方法上加上@Async注解,當(dāng)然前提是,這個方法所在的類必須在Spring環(huán)境中。
項目實況介紹
項目中,我需要將700w條數(shù)據(jù),定時任務(wù)加入到mysql表中,去掉日志打印和一些其他因素的影響,入庫時間還是需要8個小時以上,嚴(yán)重影響后續(xù)的一系列操作,所以我才用@Async注解,來實現(xiàn)異步入庫,開了7個線程,入庫時間縮短為1.5個小時,大大提高效率,以下是詳細(xì)介紹,一級一些需要注意的坑.
需要寫個配置文件兩種方式
第一種方式
@Configuration
@EnableAsync //啟用異步任務(wù)
public class ThreadConfig {
@Bean
public ThreadPoolTaskExecutor executor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(15);
//配置最大線程數(shù)
executor.setMaxPoolSize(30);
//配置隊列大小
executor.setQueueCapacity(1000);
//線程的名稱前綴
executor.setThreadNamePrefix("Executor-");
//線程活躍時間(秒)
//executor.setKeepAliveSeconds(60);
//等待所有任務(wù)結(jié)束后再關(guān)閉線程池
executor.setWaitForTasksToCompleteOnShutdown(true);
//設(shè)置拒絕策略
//executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執(zhí)行初始化
executor.initialize();
return executor;
}
}
第二種方式
@Configuration
@EnableAsync
public class ExecutorConfig {
@Value("${thread.maxPoolSize}")
private Integer maxPoolSize;
@Value("${thread.corePoolSize}")
private Integer corePoolSize;
@Value("${thread.keepAliveSeconds}")
private Integer keepAliveSeconds;
@Value("${thread.queueCapacity}")
private Integer queueCapacity;
@Bean
public ThreadPoolTaskExecutor asyncExecutor(){
ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(corePoolSize);//核心數(shù)量
taskExecutor.setMaxPoolSize(maxPoolSize);//最大數(shù)量
taskExecutor.setQueueCapacity(queueCapacity);//隊列
taskExecutor.setKeepAliveSeconds(keepAliveSeconds);//存活時間
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);//設(shè)置等待任務(wù)完成后線程池再關(guān)閉
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//設(shè)置拒絕策略
taskExecutor.initialize();//初始化
return taskExecutor;
}
}配置文件
#線程池 thread: corePoolSize: 5 maxPoolSize: 10 queueCapacity: 100 keepAliveSeconds: 3000
springboot默認(rèn)是不開啟異步注解功能的,所以,要讓springboot中識別@Async,則必須在入口文件中,開啟異步注解功能
package com.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
//開啟異步注解功能
@EnableAsync
@SpringBootApplication
public class SpringbootTaskApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootTaskApplication.class, args);
}
}這里有個坑!
如果遇到報錯:需要加上 proxyTargetClass = true
The bean 'xxxService' could not be injected as a'com.xxxx.xxx.xxxService' because it is a JDK dynamic proxy that implements:
xxxxxx
Action:
Consider injecting the bean as one of its interfaces orforcing the use of CGLib-based proxiesby setting proxyTargetClass=true on @EnableAsync and/or @EnableCaching.
當(dāng)我service層處理完邏輯,吧list分成7個小list然后調(diào)用異步方法(異步方法的參數(shù)不用管,沒影響,只截取核心代碼)
List<List<DistributedPredictDTO>> partition = Lists.partition(userList, userList.size() / 7);
for (List<DistributedPredictDTO> distributedPredictDTOS : partition) {
//調(diào)用異步方法
threadService.getI(beginDate, endDate, tableName, distributedPredictDTOS, hMap, i);
}
@Slf4j
@Service
public class ThreadServiceImpl {
@Resource
ResourcePoolUrlProperties properties;
@Resource
private MonitorDao monitorDao;
@Async
Integer getI(String beginDate, String endDate, String tableName, List<DistributedPredictDTO> userList, Map<String, String> hMap, int i) {
log.info("我開始執(zhí)行");
for (DistributedPredictDTO e : userList) {
String responseStr;
HashMap<String, String> pMap = Maps.newHashMap();
pMap.put("scheduleId", e.getScheduleId());
pMap.put("scheduleName", e.getScheduleName());
pMap.put("distribsunStationId", e.getLabel());
pMap.put("distribsunStationName", e.getValue());
pMap.put("beginTime", beginDate);
pMap.put("endTime", endDate);
try {
if ("180".equals(properties.getNewPowerSys().getDistributedPredictUrl().substring(17, 20))) {
pMap = null;
}
responseStr = HttpClientUtil.doPost(properties.getNewPowerSys().getDistributedPredictUrl(), hMap, pMap);
} catch (Exception exception) {
throw new RuntimeException(e.getValue() + "的功率預(yù)測接口異常" + hMap + pMap);
}
if (org.springframework.util.StringUtils.isEmpty(responseStr)) {
log.info(e + "數(shù)據(jù)為空");
continue;
}
JSONObject resJson = JSONObject.parseObject(responseStr);
JSONObject obj = (JSONObject) resJson.get("obj");
JSONArray tableData = (JSONArray) obj.get("tabledata");
final List<DistributedUserPower> userPowers = Lists.newArrayList();
for (Object o : tableData) {
final DistributedUserPower distributedUserPower = new DistributedUserPower();
distributedUserPower.setData(((JSONObject) o).get("data").toString());
distributedUserPower.setData2(((JSONObject) o).get("data2").toString());
distributedUserPower.setDataTime(((JSONObject) o).get("time").toString());
distributedUserPower.setUserId(e.getLabel());
distributedUserPower.setUserName(e.getValue());
distributedUserPower.setAreaName(e.getScheduleName());
distributedUserPower.setCreateTime(DateUtils.getDate());
userPowers.add(distributedUserPower);
}
monitorDao.saveBatch(userPowers, tableName);
i++;
}
return i;
}這里有兩個坑!
第一個坑:
我調(diào)用的異步方法在當(dāng)前類中,則直接導(dǎo)致
@Async注解失效
正確操作,異步方法不要和同步調(diào)用方法寫在同一個類中,應(yīng)該重新調(diào)用其他類
第二個坑:
如果出現(xiàn)這個報錯:
Null return value from advice does not mat
問題分析
代碼中采用異步調(diào)用,AOP 做來一層切面處理,底層是通過 JDK 動態(tài)代理實現(xiàn)
不管采用 JDK 還是 CGLIB 代理,返回值必須是包裝類型,所以才會導(dǎo)致上訴的報錯信息
處理方案
將異步方法的返回值修改為基本類型的對應(yīng)包裝類型即可,如 int -> Integer
5分鐘測試效果圖:
最后一張是7線程:

總結(jié)
到此這篇關(guān)于spring boot使用@Async注解解決異步多線程入庫問題的文章就介紹到這了,更多相關(guān)springboot @Async異步多線程入庫內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringCloud解決Feign異步回調(diào)問題(SpringBoot+Async+Future實現(xiàn))
- Springboot?配置線程池創(chuàng)建線程及配置?@Async?異步操作線程池詳解
- 詳解springboot通過Async注解實現(xiàn)異步任務(wù)及回調(diào)的方法
- Spring Boot之@Async異步線程池示例詳解
- SpringBoot異步使用@Async的原理以及線程池配置詳解
- 使用Spring開啟@Async異步方式(javaconfig配置)
- Spring里的Async注解實現(xiàn)異步操作的方法步驟
- 詳解Spring/Spring boot異步任務(wù)編程WebAsyncTask
- Spring中使用Async進行異步功能開發(fā)實戰(zhàn)示例(大文件上傳為例)
相關(guān)文章
淺析Java中XPath和JsonPath以及SpEL的用法與對比
XPath,即XML路徑語言,是一種用于在XML文檔中查找信息的語言,JsonPath是從XPath中發(fā)展而來的,專門用于JSON數(shù)據(jù)格式,本文主要來講講他們的用法與區(qū)別,需要的可以參考下2023-11-11
SpringCloud Webflux過濾器增加header傳遞方式
這篇文章主要介紹了SpringCloud Webflux過濾器增加header傳遞方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-02-02
java如何將int數(shù)組轉(zhuǎn)化為Integer數(shù)組
這篇文章主要介紹了java如何將int數(shù)組轉(zhuǎn)化為Integer數(shù)組,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-11-11
Spring?Boot?使用?SSE?方式向前端推送數(shù)據(jù)詳解
這篇文章主要介紹了Spring?Boot?使用SSE方式向前端推送數(shù)據(jù)詳解,SSE簡單的來說就是服務(wù)器主動向前端推送數(shù)據(jù)的一種技術(shù),它是單向的,也就是說前端是不能向服務(wù)器發(fā)送數(shù)據(jù)的2022-08-08

