CompletableFuture?異步編排示例詳解
從Future聊起
Future是java 1.5引入的異步編程api,它表示一個異步計算結(jié)果,提供了獲取異步結(jié)果的能力,解決了多線程場景下Runnable線程任務(wù)無法獲取結(jié)果的問題。
但是其獲取異步結(jié)果的方式并不夠優(yōu)雅,我們必須使用Future.get的方式阻塞調(diào)用線程,或者使用輪詢方式判斷 Future.isDone 任務(wù)是否結(jié)束,再獲取結(jié)果。
public interface Future<V> {
//任務(wù)是否完成
boolean isDone();
//阻塞調(diào)用線程獲取異步結(jié)果
V get() throws InterruptedException, ExecutionException;
//在指定時間內(nèi)阻塞線程獲取異步結(jié)果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
假如存在多個異步任務(wù)相互依賴,一個或多個異步線程任務(wù)需要依賴上一個異步線程任務(wù)結(jié)果,并且多個異步任務(wù)能夠組合結(jié)果,顯然這種阻塞線程的方式并不能優(yōu)雅解決。
我們更希望能夠提供一種異步回調(diào)的方式,組合各種異步任務(wù),而無需開發(fā)者對多個異步任務(wù)結(jié)果的監(jiān)聽編排。
為了解決優(yōu)化上述問題,java8 新增了CompletableFutureAPI ,其大大擴展了Future能力,并提供了異步任務(wù)編排能力。
CompletableFuture
CompletableFuture實現(xiàn)了新的接口CompletionStage,并擴展了Future接口。查看類圖

創(chuàng)建異步任務(wù)
CompletableFuture 提供了四種方法去創(chuàng)建一個異步任務(wù)。
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier):創(chuàng)建一個有返回值的異步任務(wù)實例static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor):創(chuàng)建一個有返回值的異步任務(wù)實例,可以指定線程池static CompletableFuture<Void> runAsync(Runnable runnable):創(chuàng)建一個無返回值的任務(wù)實例static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor):創(chuàng)建一個無返回值的任務(wù)實例,允許指定線程池
著幾個方法本質(zhì)上是有返回值和無返回值兩種類型方法,supply方法可以獲取異步結(jié)果,而run方法則無返回值,根據(jù)需要使用。
同時兩種類型的方法均提供了指定線程池的重載,如果不指定線程池會默認(rèn)使用ForkJoinPool.commonPool(),默認(rèn)線程數(shù)為cpu核心數(shù),建議使用自定義線程池的方式,避免線程資源競爭
一個簡單樣例
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { System.out.println("無返回值任務(wù)"); });
runAsync.get();
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "hello completableFuture");
String result = supplyAsync.get();
System.out.println(result);
我們依然可以通過get()方法阻塞獲取異步結(jié)果任務(wù),但是CompletableFuture主要還是用于異步回調(diào)及異步任務(wù)編排使用。
異步回調(diào)
在任務(wù)執(zhí)行結(jié)束后我們希望能夠自動觸發(fā)回調(diào)方法,CompletableFuture提供了兩種方法實現(xiàn)。
CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action):當(dāng)上一階段任務(wù)執(zhí)行結(jié)束后,回調(diào)方法接受上一階段結(jié)果或者異常,返回上一階段任務(wù)結(jié)果<U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn):當(dāng)上一階段任務(wù)執(zhí)行結(jié)束后,回調(diào)方法接受上一階段結(jié)果或者異常,并最終返回回調(diào)方法處理結(jié)果CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn):上一階段任務(wù)出現(xiàn)異常后的回調(diào),返回結(jié)果是回調(diào)函數(shù)的返回結(jié)果。
whenComplete 與 handle 區(qū)別:兩者均接受上一階段任務(wù)結(jié)果或異常,但是whenComplete 回調(diào)中沒有返回值,所以其結(jié)果是上一階段任務(wù),而handle 最終返回的是其回調(diào)方法方法,其主要是BiConsumer與BiFunction的區(qū)別。
異步編排
CompletionStage表示異步計算的一個階段,當(dāng)一個計算處理完成后會觸發(fā)其他依賴的階段。當(dāng)然一個階段的觸發(fā)也可以是由多個階段的完成觸發(fā)或者多個中的任意一個完成觸發(fā)。該接口定義了異步任務(wù)編排的各種場景,CompletableFuture則實現(xiàn)了這些場景。
可以把這些場景大致分為三類:串行、AND和OR。下面會逐個分析各個場景,接口中定義的以Async結(jié)尾的方法,指下一階段任務(wù)會被單獨提交到線程池中執(zhí)行,后面不在贅述。
串行
當(dāng)上一階段任務(wù)執(zhí)行完畢后,繼續(xù)提交執(zhí)行其他任務(wù)
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn):接收上一階段任務(wù)結(jié)果,并可獲取返回值。CompletableFuture<Void> thenAccept(Consumer<? super T> action):接收上一階段任務(wù)結(jié)果,無返回值。CompletableFuture<Void> thenRun(Runnable action):不接收上一階段任務(wù)結(jié)果,并且無返回值。
T:上一個任務(wù)返回結(jié)果的類型 U:當(dāng)前任務(wù)的返回值類型
AND
組合多個異步任務(wù),當(dāng)多個任務(wù)執(zhí)行完畢繼續(xù)執(zhí)行其他任務(wù)
<U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn):上一階段任務(wù)與other任務(wù)均執(zhí)行結(jié)束,接收兩個任務(wù)的結(jié)果,并可獲取返回值<U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn): 使用上一階段任務(wù)的結(jié)果,返回一個新的CompletableFuture實例<U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action):上一階段任務(wù)與other任務(wù)均執(zhí)行結(jié)束,接收兩個任務(wù)的結(jié)果,無返回值CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action):上一階段任務(wù)與other任務(wù)均執(zhí)行結(jié)束,不接收兩個任務(wù)的結(jié)果,無返回值static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs):等待所有異步任務(wù)執(zhí)行結(jié)束
T:上一個任務(wù)返回結(jié)果的類型 U:上一個other任務(wù)的返回值類型 V:當(dāng)前任務(wù)返回值
OR
當(dāng)多個任務(wù)中任意任務(wù)執(zhí)行完成則繼續(xù)執(zhí)行其他任務(wù)。
<U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn): 接收上一階段任務(wù)與other任務(wù)最快執(zhí)行完成的結(jié)果,并可獲取返回值CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action):接收上一階段任務(wù)與other任務(wù)最快執(zhí)行完成的結(jié)果,無返回值CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action):上一階段任務(wù)與other任務(wù)任意任務(wù)完成執(zhí)行,不接收結(jié)果,無返回值static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs):組合多個任務(wù),返回最快執(zhí)行結(jié)束的任務(wù)結(jié)果
Future 機制擴展
CompletableFuture不僅實現(xiàn)了Future接口,同時對其進行了擴展,提供了更加優(yōu)雅的實現(xiàn)。
T join():與get()方法用法一致,阻塞調(diào)用線程獲取結(jié)果,但是不會拋出具體異常,簡化了使用上下文T getNow(T valueIfAbsent):當(dāng)任務(wù)結(jié)束返回任務(wù)結(jié)果,否則返回給定的結(jié)果valueIfAbsent。boolean complete(T value):當(dāng)任務(wù)未結(jié)束時設(shè)置給定的結(jié)果value并結(jié)束任務(wù),已結(jié)束的任務(wù)不會生效。boolean completeExceptionally(Throwable ex):當(dāng)任務(wù)未結(jié)束時設(shè)置異常結(jié)果并結(jié)束任務(wù),已結(jié)束的任務(wù)不會生效
CompletableFuture 實踐
我們通過CompletableFuture實現(xiàn)一個經(jīng)典的燒水程序。

我們可以把這個流程分為三個異步任務(wù)。
任務(wù)1:洗水壺->燒水
任務(wù)2:洗水壺->洗茶杯->拿茶葉
任務(wù)3:泡茶,需要等待任務(wù)1與任務(wù)2結(jié)束。
通過代碼模擬實現(xiàn)
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println("洗水壺");
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return "水壺";
}).thenApply(e->{
System.out.println("燒水");
try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return "熱水";
});
//洗水壺->洗水杯->拿茶葉
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("洗茶壺");
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return "茶壺";
}).thenApply(e->{
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("洗水杯");
return "水杯";
}).thenApply(e->{
System.out.println("拿茶葉");
return "茶葉";
});
//泡茶
CompletableFuture<String> task3 = task1.thenCombine(task2, (a, b) -> {
System.out.println("泡茶");
return "茶";
});
String tea = task3.join();
System.out.println(tea);以上就是CompletableFuture 異步編排示例詳解的詳細(xì)內(nèi)容,更多關(guān)于CompletableFuture 異步編排的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot多級緩存實現(xiàn)方案總結(jié)
所謂多級緩存,是指在整個系統(tǒng)架構(gòu)的不同系統(tǒng)層面進行數(shù)據(jù)緩存,以提升訪問速度,多級緩存就是為了解決項目服務(wù)中單一緩存使用不足的缺點,本文我們將給大家總結(jié)了SpringBoot多級緩存實現(xiàn)方案,需要的朋友可以參考下2023-08-08
解決Eclipse打開.java文件異常,提示用系統(tǒng)工具打開的問題
這篇文章主要介紹了解決Eclipse打開.java文件異常,提示用系統(tǒng)工具打開的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01
詳解Java異常處理中throw與throws關(guān)鍵字的用法區(qū)別
這篇文章主要介紹了詳解Java異常處理中throw與throws關(guān)鍵字的用法區(qū)別,這也是Java面試題目中的???需要的朋友可以參考下2015-11-11
SpringBoot使用TraceId進行日志追蹤的實現(xiàn)
本文主要介紹了SpringBoot使用TraceId進行日志追蹤的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-01-01
IntelliJ IDEA中折疊所有Java代碼,再也不怕大段的代碼了
今天小編就為大家分享一篇關(guān)于IntelliJ IDEA中折疊所有Java代碼,再也不怕大段的代碼了,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-10-10
java 中JDBC連接數(shù)據(jù)庫代碼和步驟詳解及實例代碼
這篇文章主要介紹了java 中JDBC連接數(shù)據(jù)庫代碼和步驟詳解及實例代碼的相關(guān)資料,需要的朋友可以參考下2017-02-02
Java如何處理數(shù)據(jù)成為樹狀結(jié)構(gòu)
這篇文章主要介紹了Java如何處理數(shù)據(jù)成為樹狀結(jié)構(gòu)問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-07-07

