Java中的異步操作CompletableFuture示例詳解
一、CompletableFuture概述
CompletableFuture? 是 Java 8 引入的一個(gè)功能強(qiáng)大的異步編程工具,它實(shí)現(xiàn)了Future和CompletionStage接口,提供了更靈活、更強(qiáng)大的異步任務(wù)編排能力。相比傳統(tǒng)的Future,它支持非阻塞的鏈?zhǔn)秸{(diào)用和組合操作。
二、CompletableFuture的核心特性
異步任務(wù)的創(chuàng)建與執(zhí)行
異步任務(wù)是CompletableFuture的基礎(chǔ)單元。關(guān)鍵在于區(qū)分任務(wù)的發(fā)起和結(jié)果的獲取是兩個(gè)獨(dú)立的時(shí)機(jī),這正是異步的本質(zhì)。
//最簡單的異步調(diào)用,默認(rèn)使用 ForkJoinPool.commonPool()
CompletableFuture<Integer> computeFuture = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000); // 模擬耗時(shí)計(jì)算
return 42;
});CompletableFuture底層是使用線程池幫我們?nèi)ネ瓿僧惒降囊粋€(gè)調(diào)用,所以,我們也可以自定義線程池來實(shí)現(xiàn)異步操作。
//自定義線程池
ExecutorService bizExecutor = Executors.newFixedThreadPool(5, r -> {
Thread t = new Thread(r);
t.setName("BizExecutor-" + t.getId());
return t;
});實(shí)際開發(fā)中,強(qiáng)烈建議始終指定自定義線程池,避免業(yè)務(wù)代碼占用公共線程池影響系統(tǒng)穩(wěn)定性。
任務(wù)結(jié)果的鏈?zhǔn)教幚?/h3>
這是CompletableFuture最優(yōu)雅的特性,它將回調(diào)地獄轉(zhuǎn)換為流暢的管道。其中有三個(gè)重要的方法
- thenApply:接受輸入,產(chǎn)生輸出,實(shí)際用作轉(zhuǎn)換函數(shù)
- thenAccept:接受輸入,無輸出,作為數(shù)據(jù)的消費(fèi)者
- thenRun:無輸入,無輸出,可用作通知等操作
// 電商訂單處理流水線示例
CompletableFuture<Order> orderFuture = CompletableFuture
// 階段1:創(chuàng)建訂單(有返回值)
.supplyAsync(() -> orderService.createOrder(request), orderExecutor)
// 階段2:驗(yàn)證庫存(轉(zhuǎn)換結(jié)果)
.thenApplyAsync(order -> {
inventoryService.checkStock(order.getItems());
return order.markAsValidated();
}, inventoryExecutor)
// 階段3:扣減庫存(消費(fèi)結(jié)果,無返回)
.thenAcceptAsync(order -> {
inventoryService.deductStock(order.getItems());
logger.info("庫存扣減完成,訂單號(hào):{}", order.getId());
}, inventoryExecutor)
// 階段4:發(fā)送通知(純副作用操作)
.thenRunAsync(() -> {
notificationService.sendOrderCreated();
}, notificationExecutor);帶有Async后綴的方法(如thenApplyAsync)會(huì)在新線程中執(zhí)行,而不帶后綴的會(huì)在前一個(gè)任務(wù)完成的線程中執(zhí)行。在IO密集型場(chǎng)景使用Async版本可提高并發(fā)度。
多個(gè)任務(wù)組合
現(xiàn)實(shí)業(yè)務(wù)中很少有單一異步任務(wù),更多是多個(gè)任務(wù)的協(xié)同。CompletableFuture提供了豐富的組合模式。
- thenCombine:等待兩個(gè)都完成,然后合并
- thenCompose: 前一個(gè)完成后再開始下一個(gè)
- allOf: 等待所有(3+個(gè))任務(wù)完成
// 場(chǎng)景:用戶詳情頁需要聚合多個(gè)服務(wù)的數(shù)據(jù)
public CompletableFuture<UserProfile> getUserProfile(String userId) {
// 并行調(diào)用三個(gè)獨(dú)立服務(wù)
CompletableFuture<UserInfo> userInfoFuture =
userService.getUserInfoAsync(userId);
CompletableFuture<List<Order>> ordersFuture =
orderService.getUserOrdersAsync(userId);
CompletableFuture<List<Address>> addressesFuture =
addressService.getUserAddressesAsync(userId);
// thenCombine等待兩個(gè)都完成,然后合并
CompletableFuture<UserWithOrders> userWithOrders = userInfoFuture
.thenCombine(ordersFuture, (user, orders) -> {
return new UserWithOrders(user, orders);
});
// thenCompose前一個(gè)完成后再開始下一個(gè)
CompletableFuture<String> personalizedGreeting = userInfoFuture
.thenCompose(user ->
greetingService.getGreetingAsync(user.getLanguage(), user.getName())
);
// allOf 等待所有(3+個(gè))任務(wù)完成
return CompletableFuture.allOf(
userWithOrders,
addressesFuture,
personalizedGreeting
)
.thenApply(v -> {
// 所有都完成后的聚合
return new UserProfile(
userWithOrders.join().getUser(),
userWithOrders.join().getOrders(),
addressesFuture.join(),
personalizedGreeting.join()
);
});
}這里有個(gè)要注意的點(diǎn):allOf本身返回的是 CompletableFuture<Void>,不包含各子任務(wù)的結(jié)果。需要像上面示例中通過 join()或 get()來獲取,但注意這不會(huì)導(dǎo)致死鎖,因?yàn)榇藭r(shí)所有子任務(wù)已確定完成。
異常處理
異步中的異常處理比同步更復(fù)雜,因?yàn)楫惓:蜆I(yè)務(wù)代碼在時(shí)間、空間上都是解耦的。CompletableFuture 的異常處理是功能型、非侵入式的。
// 健壯的數(shù)據(jù)處理管道
CompletableFuture<Report> reportFuture = CompletableFuture
.supplyAsync(() -> dataFetcher.fetchData(), dataExecutor)
// exceptionally異常時(shí)提供降級(jí)值
.exceptionally(ex -> {
log.warn("數(shù)據(jù)獲取失敗,使用緩存", ex);
return cacheService.getCachedData();
})
// 對(duì)數(shù)據(jù)做處理,同時(shí)捕獲處理中的異常
.thenApplyAsync(data -> {
try {
return dataProcessor.process(data);
} catch (ProcessingException e) {
throw new CompletionException("處理失敗", e);
}
}, processExecutor)
// handle統(tǒng)一處理成功和失敗
.handle((processedData, ex) -> {
if (ex != null) {
// 失敗路徑
metrics.recordFailure(ex);
return Report.errorReport("系統(tǒng)繁忙,請(qǐng)稍后重試");
} else {
// 成功路徑
return Report.successReport(processedData);
}
})
// whenComplete無論成功失敗都執(zhí)行(類似finally)
.whenComplete((report, ex) -> {
// 資源清理、日志記錄等
cleanResources();
log.info("報(bào)告生成完成,狀態(tài):{}",
ex == null ? "成功" : "失敗");
});exceptionally和handle 的區(qū)別:
- exceptionally:只在異常時(shí)觸發(fā),必須返回一個(gè)恢復(fù)值
- handle:無論成功失敗都觸發(fā),通過第二個(gè)參數(shù)判斷狀態(tài)
建議在異步鏈的末尾使用 handle或 whenComplete做統(tǒng)一的最終處理,而不是在每個(gè)階段都捕獲異常。
三、性能優(yōu)化與注意事項(xiàng)
避免阻塞主線程
在Web請(qǐng)求線程中調(diào)用get()會(huì)阻塞整個(gè)請(qǐng)求
錯(cuò)誤寫法:
public Result handleRequest() {
CompletableFuture<Data> future = fetchDataAsync();
return future.get(); // 阻塞,浪費(fèi)容器線程
}接下來展示正確寫法:
//完全異步
public CompletableFuture<Result> handleRequestAsync() {
return fetchDataAsync()
.thenApply(data -> convertToResult(data))
.exceptionally(ex -> Result.error("處理失敗"));
}合理使用線程池資源
線程池隔離原則:不同業(yè)務(wù)、不同重要級(jí)別的任務(wù)使用不同的線程池,避免相互影響。
根據(jù)任務(wù)類型的不同配置不同的線程池參數(shù)
public class ThreadPoolConfig {
// CPU密集型:核心數(shù)+1
@Bean("cpuIntensivePool")
public ExecutorService cpuIntensivePool() {
int coreSize = Runtime.getRuntime().availableProcessors() + 1;
return new ThreadPoolExecutor(
coreSize, coreSize * 2, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("cpu-pool")
);
}
// IO密集型:可設(shè)置較大線程數(shù)
@Bean("ioIntensivePool")
public ExecutorService ioIntensivePool() {
return new ThreadPoolExecutor(
20, 100, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), // 無界隊(duì)列,快速響應(yīng)
new NamedThreadFactory("io-pool")
);
}
// 定時(shí)/超時(shí)任務(wù):使用ScheduledExecutor
@Bean("timeoutPool")
public ScheduledExecutorService timeoutPool() {
return Executors.newScheduledThreadPool(5,
new NamedThreadFactory("timeout-pool"));
}
}四、總結(jié)
CompletableFuture 不僅僅是API的增強(qiáng),更是編程范式的轉(zhuǎn)變。
核心價(jià)值:
- 聲明式異步:從"如何做"到"做什么"的轉(zhuǎn)變
- 組合式抽象:將并發(fā)復(fù)雜性封裝在簡潔的API之后
- 函數(shù)式融合:將函數(shù)式編程與并發(fā)編程結(jié)合
正確使用CompletableFuture,可以構(gòu)建出既高性能又易于維護(hù)的異步系統(tǒng)。
到此這篇關(guān)于Java中的異步操作CompletableFuture示例詳解的文章就介紹到這了,更多相關(guān)java異步CompletableFuture內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java 中的 CompletableFuture如何讓異步編程變得簡單
- Java CompletableFuture之異步執(zhí)行、鏈?zhǔn)秸{(diào)用、組合多個(gè)Future、異常處理和超時(shí)控制等詳解
- Java8 CompletableFuture異步編程解讀
- Java異步線程中的CompletableFuture與@Async詳解
- Java中的CompletableFuture異步編程詳解
- java中CompletableFuture異步執(zhí)行方法
- Java?CompletableFuture實(shí)現(xiàn)多線程異步編排
- Java8通過CompletableFuture實(shí)現(xiàn)異步回調(diào)
- Java?8?的異步編程利器?CompletableFuture的實(shí)例詳解
相關(guān)文章
生成8位隨機(jī)不重復(fù)的數(shù)字編號(hào)的方法
生成隨機(jī)不重復(fù)的數(shù)字編號(hào)在某些情況下也會(huì)用到,本文以生成8位隨機(jī)不重復(fù)的數(shù)字編號(hào)為例與大家分享下具體的實(shí)現(xiàn)過程,感興趣的朋友可以參考下2013-09-09
SpringBoot使用AOP,內(nèi)部方法失效的解決方案
這篇文章主要介紹了SpringBoot使用AOP,內(nèi)部方法失效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
關(guān)于@JSONField和@JsonFormat的使用區(qū)別說明
這篇文章主要介紹了關(guān)于@JSONField 和 @JsonFormat的區(qū)別說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11
Java class文件格式之常量池_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了Java class文件格式之常量池的相關(guān)資料,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06
Java KindEditor粘貼圖片自動(dòng)上傳到服務(wù)器功能實(shí)現(xiàn)
這篇文章主要介紹了Java KindEditor粘貼圖片自動(dòng)上傳到服務(wù)器功能實(shí)現(xiàn),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04
java解決單緩沖生產(chǎn)者消費(fèi)者問題示例
這篇文章主要介紹了java解單緩沖生產(chǎn)者消費(fèi)者問題示例,需要的朋友可以參考下2014-04-04

