Java中CompletableFuture四種調用模式的實現
三種調用模式
CompletableFuture(以下簡稱CF)提供了三種調用模式,分別是就地執(zhí)行、異步使用默認執(zhí)行器執(zhí)行、異步指定執(zhí)行器執(zhí)行。
就地執(zhí)行指的是回調在當前線程中執(zhí)行,調用thenApply、thenCompose等方法時,如果當前 CF 已經執(zhí)行完成,會立即執(zhí)行回調,稱為當前執(zhí)行,執(zhí)行時刻為當前(now);如果未完成,則由完成 CF 的線程執(zhí)行。
如下分別是立即執(zhí)行和異步執(zhí)行的例子:
var cf = CompletableFuture.completed(1); cf.thenApply(x -> x + 1) .thenRun(x -> System.out.println(x)) .join();
以上代碼全程同步。
var cf = new CompletableFuture<Integer>(); cf.thenApply(x -> x + 1) .thenRun(x -> System.out.println(x)); new Thread(() -> cf.complete(1)).start(); Uninterruptible.sleep(1, TimeUnit.SECONDS);
thenApply、thenRun在調用時,cf未完成,無法立刻執(zhí)行,其執(zhí)行在完成cf的線程,也就是新創(chuàng)建的線程中。
異步執(zhí)行指的是回調任務的執(zhí)行必定在執(zhí)行器中執(zhí)行,默認執(zhí)行器為Java提供的commonPool線程池,當然也可以通過重寫defaultExecutor實現調用指定的線程池。
var cf = CompletableFuture.completed(1); cf.thenApplyAsync(x -> x + 1) .thenRunAsync(x -> System.out.println(x)) .join(); Uninterruptible.sleep(1, TimeUnit.SECONDS);
以上代碼中打印操作在公共線程池中執(zhí)行。
比較
就地執(zhí)行性能最好,可以完全避免線程上下文切換,適合執(zhí)行一些輕量級任務。缺點是使用不當時,會阻塞當前線程;可能會造成“線程泄露”,導致線程池中的線程沒有及時歸還。
異步執(zhí)行反之。
第 4 種調用模式
線程池中任務執(zhí)行有一條原則:盡最大努力交付。意思是如果任務提交時沒有拒絕,沒有拋出拒絕執(zhí)行等異常,通常來說通過了信號量、限流器、執(zhí)行時間、線程數等諸多限制,后續(xù)的執(zhí)行應該不作額外限制,且努力完成;而不是等執(zhí)行過程中再拋出類似拒絕服務等異常。反過來說,如果當前任務提交時,任務不能執(zhí)行,就應該拒絕執(zhí)行。這條簡單的原則可以避免考慮復雜的問題,比如反壓、取消機制等,也能夠應對大多數的業(yè)務場景。
對于非輕量級任務,例如 A -> B,表示任務A執(zhí)行完成后執(zhí)行任務B,常規(guī)的線程池實現有一個問題,B任務的提交不一定立即執(zhí)行,可能遇到排隊(進入阻塞隊列)甚至超時等情況,最終導致整個任務的滯后。此時如果能就地執(zhí)行最好。
如果選擇就地執(zhí)行策略,解決了以上問題,但是可能會導致CF已完成后執(zhí)行的當前線程阻塞。這時最好有執(zhí)行器執(zhí)行任務,而不是占用當前線程。
最近CFFU類庫提供LLCF#relayAsync0,完美解決了以上痛點。LL表示low level,對于其的正確使用要求開發(fā)人員對CompletableFuture有著充分的理解。relay的含義是接力,這里指的是
- relay Async 接力異步
- Async 詞尾,保證一定是異步(和CF命名表義 一樣)
異步時(不阻塞調用邏輯),用前個computation的線程接力執(zhí)行,不使用新線程,避免了上下文切換開銷。
例子
relayAsync0 簽名如下:
public static <T, F extends CompletionStage<?>> F relayAsync0(
CompletionStage<? extends T> cfThis,
Function<CompletableFuture<T>, F> relayComputations, Executor executor)
需要注意傳入的回調任務不是普通的Function,而是入參CF,出參 CompletionStage,也就是說我們需要傳入對CF的回調。比如:
cf -> cf.thenApply(...) cf -> cf.thenCompose(...) cf -> cf.thenRun(...)
該方法使用時和thenApplyAsync很像,只不過由實例方法調用改成了靜態(tài)方法調用,回調參數為對CF的回調。
以下代碼引用自CFFU作者 李鼎 | Jerry Lee,詳細說明四種調用模式的用法:
public class RelayAsyncDescriptionAndExample {
static void executeComputationsOfNewStage(CompletableFuture<String> cf) {
// ================================================================================
// Default execution
// ================================================================================
cf.thenApply(s -> {
// a simulating long-running computation...
sleep(1000);
// if input cf is COMPLETED when computations execute,
// executes the long time computation SYNCHRONOUSLY (aka. in the caller thread);
// this SYNCHRONIZED execution leads to BLOCKing sequential codes of caller... ??
return s + s;
});
// ================================================================================
// Asynchronous execution of CompletableFuture(default executor or custom executor)
// ================================================================================
cf.thenApplyAsync(s -> {
// a simulating long-running computation...
sleep(1000);
// always executes via an executor(guarantees not to block sequential code of caller).
// if input cf is INCOMPLETE when computations execute,
// the execution via an executor leads to ONE MORE thread switching. ??
return s + s;
});
// ================================================================================
// How about the fourth way to arrange execution of a new stage's computations?
// ================================================================================
//
// - if input cf is COMPLETED when computations execute, use "asynchronous execution" (via supplied Executor),
// won't block sequential code of caller ?
// - otherwise, use "default execution", save one thread switching ?
//
// Let's call this way as "relay async".
LLCF.relayAsync0(cf, f -> f.thenApply(s -> {
// a simulating long-running computation...
sleep(1000);
// if input cf is COMPLETED, executes via supplied executor
// if input cf is INCOMPLETE, use "default execution"
return s + s;
}), ForkJoinPool.commonPool());
}
}
實現分析
public static <T, F extends CompletionStage<?>> F relayAsync0(
CompletionStage<? extends T> cfThis,
Function<CompletableFuture<T>, F> relayComputations, Executor executor) {
final CompletableFuture<T> promise = new CompletableFuture<>();
final F ret = relayComputations.apply(promise);
final Thread callerThread = currentThread();
final boolean[] returnedFromPeek0 = {false};
LLCF.peek0(cfThis, (v, ex) -> {
if (currentThread().equals(callerThread) && !returnedFromPeek0[0]) {
// If the action is running in the caller thread(single same thread) and `peek0` invocation does not
// return to caller(flag returnedFromPeek0 is false), the action is being executed synchronously.
// To prevent blocking the caller's sequential code, use the supplied executor to complete the promise.
executor.execute(() -> completeCf0(promise, v, ex));
} else {
// Otherwise, complete the promise directly, avoiding one thread switching.
completeCf0(promise, v, ex);
}
}, "relayAsync0");
returnedFromPeek0[0] = true;
return ret;
}
說明:
- completeCf0方法可以將結果v或者異常ex設置到promise中
- peek0 近似等效于 whenComplete
分析:
- 可以通過引入新的CF,也就是 promise 實現線程傳遞,其他線程“完成”promise時,這個線程隱式傳到了promise中,可以理解成隱式上下文。任何一個CF都帶有一個隱式上下文。
- returnedFromPeek0 避免了異步調用但是恰好是同線程的問題,此時也應該實現relay語義,因為我們的目的是避免對當前線程的阻塞。returnedFromPeek0 天然線程安全,因為其訪問總是在一個確定的線程內。
- else 代碼塊:就地執(zhí)行,避免線程切換。
總結
到此這篇關于Java中CompletableFuture四種調用模式的實現的文章就介紹到這了,更多相關Java CompletableFuture 調用模式內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Spring Cloud Config分布式配置中心使用介紹詳解
分布式配置中心應用場景 往往,我們使用配置文件管理?些配置信息,比如application.yml 單體應用架構:配置信息的管理、維護并不會顯得特別麻煩,手動操作就可以,因為就一個工程2022-09-09
SpringBoot ThreadLocal 簡單介紹及使用詳解
ThreadLocal 叫做線程變量,意思是 ThreadLocal 中填充的變量屬于當前線程,該變量對其他線程而言是隔離的,也就是說該變量是當前線程獨有的變量,這篇文章主要介紹了SpringBoot ThreadLocal 的詳解,需要的朋友可以參考下2024-01-01
在SpringBoot中利用RocketMQ實現批量消息消費功能
RocketMQ 是一款分布式消息隊列,支持高吞吐、低延遲的消息傳遞,對于需要一次處理多條消息的場景,RocketMQ 提供了批量消費的機制,這篇文章將展示如何在 Spring Boot 中實現這一功能,感興趣的小伙伴跟著小編一起來看看吧2024-11-11

