java異步導(dǎo)出的實(shí)現(xiàn)過(guò)程
背景
假設(shè)我們有一個(gè)在線(xiàn)學(xué)習(xí)平臺(tái),管理員需要定期生成銷(xiāo)售報(bào)告,包括課程銷(xiāo)售情況和用戶(hù)購(gòu)買(mǎi)情況等重要數(shù)據(jù)。然而,由于數(shù)據(jù)量較大,生成報(bào)告可能需要較長(zhǎng)時(shí)間,并且可能會(huì)占用大量系統(tǒng)資源,從而影響用戶(hù)的使用體驗(yàn)。為了解決這個(gè)問(wèn)題,我們考慮采用異步導(dǎo)出的方案。
異步導(dǎo)出的工作原理是將導(dǎo)出操作放在一個(gè)異步任務(wù)中執(zhí)行,而不是立即在用戶(hù)發(fā)起導(dǎo)出請(qǐng)求后執(zhí)行導(dǎo)出操作。這樣一來(lái),用戶(hù)無(wú)需等待導(dǎo)出任務(wù)完成,就可以繼續(xù)進(jìn)行其他操作,而系統(tǒng)則在后臺(tái)完成導(dǎo)出任務(wù)。
這種方案有以下優(yōu)點(diǎn):
- 提高系統(tǒng)響應(yīng)速度: 用戶(hù)發(fā)起導(dǎo)出請(qǐng)求后,系統(tǒng)可以立即響應(yīng)而不必等待導(dǎo)出任務(wù)完成,從而提高了系統(tǒng)的響應(yīng)速度。
- 改善用戶(hù)體驗(yàn): 用戶(hù)無(wú)需等待導(dǎo)出任務(wù)完成,可以繼續(xù)使用系統(tǒng)進(jìn)行其他操作,這有助于提升用戶(hù)體驗(yàn)。
- 降低系統(tǒng)負(fù)載: 將耗時(shí)的導(dǎo)出操作放在異步任務(wù)中執(zhí)行,可以避免阻塞系統(tǒng)資源,從而降低系統(tǒng)的負(fù)載,確保其他用戶(hù)的操作不受影響。
異步導(dǎo)出在許多需要處理大量數(shù)據(jù)或耗時(shí)操作的場(chǎng)景中都非常有用,可以有效提升系統(tǒng)的性能和用戶(hù)體驗(yàn)。
數(shù)據(jù)庫(kù)設(shè)計(jì)
首先我們需要設(shè)計(jì)一個(gè)保存導(dǎo)出任務(wù)的表,需要記錄流轉(zhuǎn)狀態(tài)、操作人、任務(wù)參數(shù),后續(xù)任務(wù)的創(chuàng)建、導(dǎo)出完成/失敗都需要操作這張表
CREATE TABLE `t_export_task`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`task_id` varchar(50) NOT NULL COMMENT '任務(wù)id',
`task_type` tinyint(4) NOT NULL COMMENT '任務(wù)類(lèi)型',
`task_param` varchar(1000) NOT NULL COMMENT '任務(wù)參數(shù)',
`status` tinyint(3) NOT NULL DEFAULT 0 COMMENT '狀態(tài) 0-處理中 1-成功 -1失敗',
`file_url` varchar(500) DEFAULT NULL COMMENT '文件url',
`remark` varchar(200) DEFAULT NULL COMMENT '備注',
`create_user_id` int(11) NOT NULL COMMENT '操作人id',
`create_user_name` varchar(50) NOT NULL COMMENT '操作人名稱(chēng)',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時(shí)間',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時(shí)間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB COMMENT='導(dǎo)出任務(wù)記錄';代碼實(shí)現(xiàn)
導(dǎo)出工具類(lèi) :
負(fù)責(zé)提交導(dǎo)出任務(wù)、取消任務(wù)以及上傳導(dǎo)出文件到OSS服務(wù)器等功能。
- 導(dǎo)出任務(wù)線(xiàn)程池: 通過(guò)
ExecutorService線(xiàn)程池來(lái)執(zhí)行導(dǎo)出任務(wù),并確保線(xiàn)程池的單例化,防止重復(fù)創(chuàng)建,提高性能。 - 導(dǎo)出類(lèi)型對(duì)應(yīng)的任務(wù): 使用
Map<Integer, CompletableFuture<String>>來(lái)保存正在運(yùn)行的導(dǎo)出任務(wù),以便后續(xù)取消任務(wù)或跟蹤任務(wù)狀態(tài)。 - 提交導(dǎo)出任務(wù): 提交導(dǎo)出任務(wù)時(shí),先初始化線(xiàn)程池,然后使用
CompletableFuture.supplyAsync()方法執(zhí)行異步任務(wù),并在異步任務(wù)中生成導(dǎo)出文件,然后上傳到OSS服務(wù)器,最后返回導(dǎo)出文件的URL。 - 取消任務(wù): 取消任務(wù)時(shí),從保存的任務(wù)映射中獲取對(duì)應(yīng)的
CompletableFuture實(shí)例,并調(diào)用cancel()方法取消任務(wù)。 - 上傳文件到OSS服務(wù)器:
uploadToOSS()方法負(fù)責(zé)實(shí)際的文件上傳邏輯,將導(dǎo)出的文件上傳到OSS服務(wù)器,并返回文件的URL
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class ExportTaskUtil {
/**
* 導(dǎo)出任務(wù)線(xiàn)程池
*/
private static volatile ExecutorService executorService;
/**
* 導(dǎo)出類(lèi)型對(duì)應(yīng)的任務(wù)
*/
private final Map<Integer, CompletableFuture<String>> runningTasks = Maps.newConcurrentMap();
private final ExportTaskHandlerFactory exportTaskHandlerFactory;
private final FileUploadService fileUploadService;
/**
* @description 提交導(dǎo)出任務(wù)
* @author youmu
* @date 2024/1/26 17:58
* @param exportTask 導(dǎo)出任務(wù)
*/
public CompletableFuture<String> submit(ExportTask exportTask) {
// 初始化線(xiàn)程池
initThreadPool();
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
File exportFile = null;
// 獲取handler
ExportTaskHandler handler = exportTaskHandlerFactory.getHandler(exportTask.getTaskType());
// 生成文件
try {
exportFile = handler.generateExportFile(exportTask.getTaskParam());
if (exportFile == null) {
throw new BizException(CodeEnum.NOT_FOUND, "導(dǎo)出文件為空");
}
// 上傳文件到OSS服務(wù)器,獲取文件URL
return uploadToOSS(exportFile);
} catch (BizException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (exportFile != null) {
FileUtil.del(exportFile);
}
}
},executorService);
runningTasks.put(exportTask.getTaskType(), future);
return future;
}
private static void initThreadPool() {
if (executorService == null) {
synchronized (ExportTaskUtil.class) {
if (executorService == null) {
executorService = ThreadUtil.newFixedExecutor(4, "asyncExport", false);;
}
}
}
}
/**
* @description 取消任務(wù)
* @author youmu
* @date 2024/1/26 19:04
* @param exportTask 任務(wù)
*/
public void cancel(ExportTask exportTask) {
CompletableFuture<String> future = runningTasks.get(exportTask.getTaskType());
if (future != null && !future.isDone()) {
future.cancel(true);
}
}
/**
* @description 上傳文件到OSS服務(wù)器
* @author youmu
* @date 2024/1/29 16:56
*/
private String uploadToOSS(File exportFile) {
// 實(shí)現(xiàn)文件上傳邏輯,返回文件URL
return fileUploadService.uploadFileBySize(exportFile,"export/" + exportFile.getName());
}
}
導(dǎo)出任務(wù)處理的工廠(chǎng)類(lèi)以及相關(guān)的接口和枚舉定義
導(dǎo)出任務(wù)采用來(lái)工廠(chǎng)+策略的設(shè)計(jì)模式,工廠(chǎng)模式將對(duì)象的創(chuàng)建邏輯封裝到工廠(chǎng)類(lèi)中,策略模式將不同的行為封裝到不同的策略類(lèi)中,使得代碼具有良好的可擴(kuò)展性、靈活性和可維護(hù)性。
- ExportTaskHandlerFactory: 這是一個(gè)工廠(chǎng)類(lèi),用于根據(jù)導(dǎo)出任務(wù)類(lèi)型獲取對(duì)應(yīng)的任務(wù)處理器。在初始化時(shí),它會(huì)將所有實(shí)現(xiàn)了
ExportTaskHandler接口的處理器注入進(jìn)來(lái),并根據(jù)任務(wù)類(lèi)型建立起映射關(guān)系。 - ExportTaskHandler 接口: 這是一個(gè)導(dǎo)出任務(wù)處理器的接口,定義了生成導(dǎo)出文件和獲取導(dǎo)出任務(wù)類(lèi)型的方法,具體的導(dǎo)出任務(wù)處理器需要實(shí)現(xiàn)該接口。
- ExportTaskTypeEnum 枚舉: 這是一個(gè)枚舉類(lèi)型,定義了導(dǎo)出任務(wù)的類(lèi)型,包括了任務(wù)類(lèi)型的代碼和描述信息。
- OrderExportHandler 類(lèi): 這是一個(gè)具體的導(dǎo)出任務(wù)處理器的實(shí)現(xiàn)類(lèi),用于處理訂單導(dǎo)出任務(wù)。它實(shí)現(xiàn)了
ExportTaskHandler接口,根據(jù)具體業(yè)務(wù)邏輯生成導(dǎo)出文件,并提供了獲取任務(wù)類(lèi)型的方法。
/**
* @description ExportTaskHandler 工廠(chǎng)類(lèi)
* @author youmu
* @date 2024/1/26 18:04
*/
@Slf4j
@Component
public class ExportTaskHandlerFactory {
private final Map<Integer, ExportTaskHandler> handlerMap = Maps.newHashMap();
@Autowired
public ExportTaskHandlerFactory(List<ExportTaskHandler> handlers) {
for (ExportTaskHandler taskHandler : handlers) {
handlerMap.put(taskHandler.getExportType().getCode(), taskHandler);
}
}
public ExportTaskHandler getHandler(Integer exportType) {
return handlerMap.get(exportType);
}
}
/**
* @description ExportTaskHandler
* @author youmu
* @date 2024/1/26 18:06
*/
public interface ExportTaskHandler {
File generateExportFile(String param) throws Exception;
ExportTaskTypeEnum getExportType();
}
/**
* @description 導(dǎo)出任務(wù)類(lèi)型
* @author youmu
* @date 2024/1/29 11:01
*/
@AllArgsConstructor
@Getter
public enum ExportTaskTypeEnum implements IEnum<Integer, String> {
CROWD_PACKAGE(1, "人群包"),
;
private final Integer code;
private final String message;
}
public class OrderExportHandler implements ExportTaskHandler{
@Override
public File generateExportFile(String param) throws Exception {
return null;
}
@Override
public ExportTaskTypeEnum getExportType() {
return null;
}
}業(yè)務(wù)調(diào)用
導(dǎo)出任務(wù)的門(mén)面類(lèi) ExportTaskFacade,它提供了一系列方法來(lái)提交、取消、重試導(dǎo)出任務(wù),并提供了查詢(xún)導(dǎo)出任務(wù)的分頁(yè)接口。
- 提交任務(wù)(submitTask): 提交導(dǎo)出任務(wù)時(shí),根據(jù)是否傳入
taskId參數(shù)來(lái)判斷是新建任務(wù)還是更新任務(wù)。如果是新建任務(wù),則創(chuàng)建一個(gè)新的ExportTask實(shí)例并保存到數(shù)據(jù)庫(kù)中,然后調(diào)用doSubmit方法提交任務(wù);如果是更新任務(wù),則更新任務(wù)的狀態(tài)為正在處理,并調(diào)用doSubmit方法提交任務(wù)。 - 任務(wù)提交處理(doSubmit): 使用
exportTaskUtil.submit(exportTask)提交異步導(dǎo)出任務(wù),并定義了任務(wù)完成后的處理邏輯。如果任務(wù)執(zhí)行成功,則更新任務(wù)狀態(tài)為成功,并設(shè)置文件的URL;如果任務(wù)執(zhí)行失敗,則記錄失敗日志,并更新任務(wù)狀態(tài)為失敗,同時(shí)記錄異常信息。 - 取消任務(wù)(cancelTask): 根據(jù)傳入的
taskId獲取對(duì)應(yīng)的導(dǎo)出任務(wù),然后調(diào)用exportTaskUtil.cancel(exportTask)取消任務(wù)。 - 重試任務(wù)(retryTask): 根據(jù)傳入的
taskId獲取對(duì)應(yīng)的導(dǎo)出任務(wù),先取消任務(wù)以防止異常情況,然后重新提交任務(wù)。
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class ExportTaskFacade {
private final ExportTaskService exportTaskService;
private final UserService userService;
private final ExportTaskUtil exportTaskUtil;
public void submitTask(Integer exportType, String param) {
submitTask(null,exportType,param);
}
public void cancelTask(Long taskId) {
ExportTask exportTask = exportTaskService.getById(taskId);
AssertUtils.notNull(exportTask, new BizException(CodeEnum.NOT_FOUND,"導(dǎo)出任務(wù)不存在"));
exportTaskUtil.cancel(exportTask);
}
public void retryTask(Long taskId) {
ExportTask exportTask = exportTaskService.getById(taskId);
AssertUtils.notNull(exportTask, new BizException(CodeEnum.NOT_FOUND,"導(dǎo)出任務(wù)不存在"));
// 取消任務(wù),防止異常情況還在執(zhí)行
exportTaskUtil.cancel(exportTask);
// 提交任務(wù)
submitTask(taskId,exportTask.getTaskType(),exportTask.getTaskParam());
}
private void submitTask(Long taskId, Integer exportType, String param) {
ExportTask exportTask;
if(taskId == null) {
// 保存導(dǎo)出任務(wù)
exportTask = new ExportTask();
Integer userId = AuthInfoHolder.getUserId();
exportTask.setTaskId(CodeGenUtil.genCode(GenCodeTypeEnum.DL));
exportTask.setCreateUserId(userId);
exportTask.setCreateUserName(userService.findById(userId).getUserName());
exportTask.setTaskType(exportType);
exportTask.setTaskParam(param);
exportTaskService.save(exportTask);
} else {
// 更新導(dǎo)出任務(wù)
exportTask = exportTaskService.getById(taskId);
exportTaskService.lambdaUpdate()
.eq(ExportTask::getId, exportTask.getId())
.set(ExportTask::getStatus, ExportStatusEnum.PROCESSING.getCode())
.update();
}
doSubmit(exportTask);
}
private void doSubmit(ExportTask exportTask) {
exportTaskUtil.submit(exportTask).thenAccept(url->{
exportTaskService.lambdaUpdate()
.eq(ExportTask::getId, exportTask.getId())
.set(ExportTask::getStatus, ExportStatusEnum.SUCCESS.getCode())
.update();
}).exceptionally(ex->{
log.error("[導(dǎo)出任務(wù)]執(zhí)行失敗,{}", exportTask.getTaskId(),ex);
exportTaskService.lambdaUpdate()
.eq(ExportTask::getId, exportTask.getId())
.set(ExportTask::getStatus, ExportStatusEnum.FAILURE.getCode())
.set(ExportTask::getRemark, ex instanceof BizException ? ex.getMessage() : "未知異常")
.update();
return null;
});
}
public Page<ExportTaskVO> findPage(ExportTaskRequest request) {
Page<ExportTask> page = exportTaskService.findPage(request);
List<ExportTaskVO> voList = ConverterUtil.toVO(ExportTaskConverter.class, page.getRecords());
Page<ExportTaskVO> pageVO = new Page<>();
pageVO.setTotal(page.getTotal());
pageVO.setSize(page.getSize());
pageVO.setCurrent(page.getCurrent());
pageVO.setPages(page.getPages());
pageVO.setRecords(voList);
return pageVO;
}
}流程圖

總結(jié)
過(guò)以上實(shí)踐,我們成功實(shí)現(xiàn)了一個(gè)輕量級(jí)的異步導(dǎo)出方案,具有以下優(yōu)點(diǎn):
- 使用線(xiàn)程池管理異步任務(wù),確保了任務(wù)的并發(fā)執(zhí)行和資源的合理利用。
- 采用 CompletableFuture 實(shí)現(xiàn)異步導(dǎo)出和回調(diào)更新,簡(jiǎn)化了異步任務(wù)的編寫(xiě)和管理。
- 使用工廠(chǎng)模式和策略模式實(shí)現(xiàn)導(dǎo)出任務(wù)處理器,使得系統(tǒng)具有良好的可擴(kuò)展性和靈活性。
然而,這種方案也存在一些缺點(diǎn):
- 資源管理不足: 如果異步導(dǎo)出任務(wù)的并發(fā)量過(guò)大,而線(xiàn)程池的資源配置不足,則可能導(dǎo)致任務(wù)排隊(duì)等待執(zhí)行,影響任務(wù)的實(shí)時(shí)性和響應(yīng)速度。
- 任務(wù)執(zhí)行效率低下: 如果導(dǎo)出任務(wù)的處理時(shí)間過(guò)長(zhǎng),且線(xiàn)程池的工作線(xiàn)程數(shù)量有限,則可能導(dǎo)致任務(wù)執(zhí)行效率低下,無(wú)法及時(shí)完成任務(wù),影響系統(tǒng)的整體性能。
- 可靠性不高,無(wú)法保證任務(wù)一定會(huì)被執(zhí)行或執(zhí)行成功,特別是在系統(tǒng)故障或異常情況下。
針對(duì)這些缺點(diǎn),可以考慮以下優(yōu)化方案:
- 合理調(diào)整線(xiàn)程池配置: 根據(jù)系統(tǒng)的實(shí)際負(fù)載情況和性能需求,合理配置線(xiàn)程池的大小和工作線(xiàn)程數(shù)量,確保資源的有效利用和任務(wù)的及時(shí)執(zhí)行。
- 優(yōu)化任務(wù)處理邏輯: 對(duì)任務(wù)的處理邏輯進(jìn)行優(yōu)化,盡量減少任務(wù)的執(zhí)行時(shí)間和資源消耗,提高任務(wù)的執(zhí)行效率和響應(yīng)速度。
- 引入異步消息處理機(jī)制: 使用消息隊(duì)列或事件驅(qū)動(dòng)模型來(lái)實(shí)現(xiàn)任務(wù)的異步處理,進(jìn)一步解耦任務(wù)提交和任務(wù)執(zhí)行過(guò)程,提高系統(tǒng)的可擴(kuò)展性和靈活性。
- 引入定時(shí)任務(wù)調(diào)度器: 使用定時(shí)任務(wù)調(diào)度器(如 xxl-job)來(lái)定期掃描和重試執(zhí)行異常任務(wù)。當(dāng)任務(wù)執(zhí)行時(shí)間超過(guò)一定閾值(如2小時(shí))或者任務(wù)執(zhí)行異常時(shí),自動(dòng)觸發(fā)重試機(jī)制,保證任務(wù)的及時(shí)執(zhí)行。
- 增加任務(wù)監(jiān)控和告警機(jī)制: 實(shí)時(shí)監(jiān)控任務(wù)的執(zhí)行情況,當(dāng)發(fā)現(xiàn)任務(wù)執(zhí)行異?;虺瑫r(shí)時(shí),及時(shí)發(fā)送告警通知,以便運(yùn)維人員及時(shí)處理和修復(fù)。
通過(guò)以上優(yōu)化方案,可以提高異步導(dǎo)出方案的可靠性和穩(wěn)定性,確保任務(wù)能夠及時(shí)執(zhí)行并完成,同時(shí)降低了系統(tǒng)的維護(hù)成本和風(fēng)險(xiǎn)。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- java8 Future異步調(diào)用實(shí)現(xiàn)方式
- Java實(shí)現(xiàn)異步轉(zhuǎn)同步的多種方法介紹
- Java開(kāi)發(fā)異步編程中常用的接口和類(lèi)示例詳解
- 如何在Java Spring實(shí)現(xiàn)異步執(zhí)行(詳細(xì)篇)
- Java基于Log4j2實(shí)現(xiàn)異步日志系統(tǒng)的性能優(yōu)化實(shí)踐指南
- Java CompletableFuture之異步執(zhí)行、鏈?zhǔn)秸{(diào)用、組合多個(gè)Future、異常處理和超時(shí)控制等詳解
- java異步控制方法的超時(shí)時(shí)間問(wèn)題
- Java后端實(shí)現(xiàn)異步編程的9種方式總結(jié)
相關(guān)文章
Java使用POI實(shí)現(xiàn)Excel文件的創(chuàng)建與處理
這篇文章主要為大家詳細(xì)介紹了Java如何采用POI原生方式實(shí)現(xiàn)自定義Excel表格表頭并生成Excel文件,文中的示例代碼講解詳細(xì),有需要的小伙伴可以了解下2025-05-05
Mybatis用注解寫(xiě)in查詢(xún)的實(shí)現(xiàn)
這篇文章主要介紹了Mybatis用注解寫(xiě)in查詢(xún)的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07
MyBatis學(xué)習(xí)筆記(二)之關(guān)聯(lián)關(guān)系
這篇文章主要介紹了MyBatis學(xué)習(xí)筆記(二)之關(guān)聯(lián)關(guān)系 的相關(guān)資料,需要的朋友可以參考下2016-02-02
java 遞歸查詢(xún)所有子節(jié)點(diǎn)id的方法實(shí)現(xiàn)
在多層次的數(shù)據(jù)結(jié)構(gòu)中,經(jīng)常需要查詢(xún)一個(gè)節(jié)點(diǎn)下的所有子節(jié)點(diǎn),本文主要介紹了java 遞歸查詢(xún)所有子節(jié)點(diǎn)id的方法實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03
Netty分布式高性能工具類(lèi)recycler的使用及創(chuàng)建
這篇文章主要為大家介紹了Netty分布式高性能工具類(lèi)recycler的使用和創(chuàng)建,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03
SpringCloud 服務(wù)負(fù)載均衡和調(diào)用 Ribbon、OpenFeign的方法
這篇文章主要介紹了SpringCloud 服務(wù)負(fù)載均衡和調(diào)用 Ribbon、OpenFeign的方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09

