Java?CompletableFuture實(shí)現(xiàn)原理分析詳解
簡介
前面的一篇文章你知道Java8并發(fā)新特性CompletableFuture嗎?介紹了CompletableFuture的特性以及一些使用方法,今天我們主要來聊一聊CompletableFuture的回調(diào)功能以及異步工作原理是如何實(shí)現(xiàn)的。
CompletableFuture類結(jié)構(gòu)
1.CompletableFuture類結(jié)構(gòu)主要有兩個(gè)屬性
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
volatile Object result; // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions
...
}- result:存儲CompletableFuture的返回,或者存儲異常對象AltResult。
- stack:是CompletableFuture.Completion對象,表示操作數(shù)棧棧頂,在進(jìn)行CompletableFuture鏈?zhǔn)秸{(diào)用的過程中,所有鏈?zhǔn)秸{(diào)用的CompletableFuture任務(wù)都會被壓入該stack中,在任務(wù)調(diào)用的過程按后進(jìn)先出的順序出棧執(zhí)行完所有任務(wù)。
2.stack屬性棧結(jié)構(gòu)
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // Treiber stack link
...
}next:存儲下一個(gè)任務(wù)鏈?zhǔn)秸{(diào)用棧。
3.UniCompletion的內(nèi)部結(jié)構(gòu)
abstract static class UniCompletion<T,V> extends Completion {
Executor executor; // executor to use (null if none)
CompletableFuture<V> dep; // the dependent to complete
CompletableFuture<T> src; // source for action
...
}UniCompletion繼承Completion類,包含以下幾個(gè)參數(shù):
- executor:異步任務(wù)執(zhí)行器,如果為空則有主線程執(zhí)行任務(wù)不進(jìn)行異步執(zhí)行。
- dep:指向當(dāng)前任務(wù)構(gòu)建的CompletabueFuture
- src:指向源CompletableFuture任務(wù)
CompletableFuture回調(diào)原理
這里為了方便講解,我們用以下簡短的代碼來進(jìn)行分析:
public static void main(String[] args) {
CompletableFuture<String> baseFuture = CompletableFuture.completedFuture("Base Future");
log.info(baseFuture.thenApply((r) -> r + " Then Apply").join());
baseFuture.thenAccept((r) -> log.info(r)).thenAccept((Void) -> log.info("Void"));
}上面的代碼我們通過創(chuàng)建一個(gè)簡單的CompletableFuture對象,再執(zhí)行baseFuture.thenApply()調(diào)用后會進(jìn)行一個(gè)入棧操作,如下圖baseFuture引用的CompletableFuture的stack屬性將會指向baseFuture.thenApply()結(jié)果返回的新CompletableFuture對象,而新CompletableFuture對象的src屬性將指向來源CompletableFuture即baseFuture所引用的對象。

2.在上一步的基礎(chǔ)上再執(zhí)行下一行代碼,結(jié)果的引用關(guān)系圖如下圖:

在執(zhí)行完baseFuture.thenAccept()的時(shí)候,thenAccept返回的任務(wù)將被壓入棧頂,next指向上一個(gè)代碼段的返回對象,在thenAccept返回的新CompletableFuture對象中在進(jìn)行一次thenAccept的調(diào)用,就再產(chǎn)生一個(gè)新的CompletableFuture對象,dept屬性就指向最新的CompletableFuture對象。
thenApply實(shí)現(xiàn)源碼分析
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
// 創(chuàng)建一個(gè)新的CompletableFuture對象
CompletableFuture<V> d = new CompletableFuture<V>();
// e:如果是異步調(diào)用直接執(zhí)行代碼塊
// !d.uniApply:執(zhí)行任務(wù),如果返回false則任務(wù)未執(zhí)行需入棧
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
// 創(chuàng)建出新的UniApply對象進(jìn)行入棧
push(c);
// 嘗試執(zhí)行任務(wù)
c.tryFire(SYNC);
}
return d;
}
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
// 任務(wù)未完成結(jié)果為null直接返回false
if (a == null || (r = a.result) == null || f == null)
return false;
// 驗(yàn)證是否出現(xiàn)異常結(jié)果,如有則任務(wù)執(zhí)行結(jié)束
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
// 異步執(zhí)行任務(wù)
if (c != null && !c.claim())
// 任務(wù)未執(zhí)行返回false
return false;
@SuppressWarnings("unchecked") S s = (S) r;
// 任務(wù)執(zhí)行完成將結(jié)果寫入result
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}以上代碼片段主要描述了CompletableFuture在執(zhí)行任務(wù)時(shí)會創(chuàng)建出新的CompletableFuture對象,使用新對象執(zhí)行任務(wù)并獲取結(jié)果使用CAS寫入到result屬性,如果任務(wù)未執(zhí)行將壓入棧頂,再重新嘗試任務(wù)執(zhí)行,在CompletableFuture其他方法的調(diào)用也都大同小異,這里不在逐一分析,可自行打開源碼閱讀便于理解。
CompletableFuture異步原理
需要進(jìn)行CompletableFuture異步調(diào)用則要使用Async結(jié)尾的方法執(zhí)行任務(wù),這里我們就拿thenAcceptAsync()的源碼進(jìn)行分析。
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
// 如果是異步任務(wù),這里的參數(shù)e不會為空,也就是會將任務(wù)執(zhí)行壓入棧頂
if (e != null || !d.uniAccept(this, f, null)) {
UniAccept<T> c = new UniAccept<T>(e, d, this, f);
push(c);
// 重點(diǎn)還是這個(gè)入口
c.tryFire(SYNC);
}
return d;
}
static final class UniAccept<T> extends UniCompletion<T,Void> {
Consumer<? super T> fn;
UniAccept(Executor executor, CompletableFuture<Void> dep,
CompletableFuture<T> src, Consumer<? super T> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d; CompletableFuture<T> a;
// dep為空即任務(wù)已被執(zhí)行過,直接返回null
// uniAccept()結(jié)果為false,可能是任務(wù)執(zhí)行中未完成,也可能是由線程池中的其他任務(wù)執(zhí)行完成
if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
// 說明當(dāng)前線程執(zhí)行了該任務(wù),返回結(jié)果繼續(xù)執(zhí)行前一個(gè)任務(wù)
return d.postFire(a, mode);
}
}
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
if (a != null && a.stack != null) {
// postComplete調(diào)用過來的,或者上一個(gè)任務(wù)執(zhí)行完成,清空棧數(shù)據(jù),否則調(diào)用postComplete完成任務(wù)
if (mode < 0 || a.result == null)
a.cleanStack();
else
// 完成任務(wù)執(zhí)行并進(jìn)行出棧
a.postComplete();
}
if (result != null && stack != null) {
if (mode < 0)
// postComplete調(diào)用過來的任務(wù)已完成
return this;
else
// 完成任務(wù)執(zhí)行并進(jìn)行出棧
postComplete();
}
return null;
}CompletableFuture進(jìn)行異步主要是通過將任務(wù)壓入棧頂后tryFire方法進(jìn)行異步處理,如果任務(wù)未被執(zhí)行則會通過postFire方法有線程池中的線程進(jìn)行任務(wù)執(zhí)行,任務(wù)執(zhí)行結(jié)果再使用CAS將結(jié)果返回到result,其他線程即可得知任務(wù)是否被執(zhí)行過,如果當(dāng)前現(xiàn)場判斷當(dāng)前任務(wù)為被執(zhí)行,則調(diào)用postComplete()執(zhí)行完成任務(wù)。
總結(jié)
CompletableFuture通過異步回調(diào)的方式,解決了開發(fā)過程中異步調(diào)用獲取結(jié)果的難點(diǎn)。開發(fā)人員只需接觸到CompletableFuture對象,以及CompletableFuture任務(wù)的執(zhí)行結(jié)果,無需設(shè)計(jì)具體異步回調(diào)的實(shí)現(xiàn),并可通過自定義線程池進(jìn)一步優(yōu)化任務(wù)的異步調(diào)用。
到此這篇關(guān)于Java CompletableFuture實(shí)現(xiàn)原理分析詳解的文章就介紹到這了,更多相關(guān)Java CompletableFuture內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot druid數(shù)據(jù)庫配置密碼加密的實(shí)現(xiàn)
Druid是阿里開發(fā)的數(shù)據(jù)庫連接池,本文主要介紹了springboot druid數(shù)據(jù)庫配置密碼加密的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-06-06
Java中synchronized關(guān)鍵字修飾方法同步的用法詳解
synchronized可以用來同步靜態(tài)和非靜態(tài)方法,下面就具體來看一下Java中synchronized關(guān)鍵字修飾方法同步的用法詳解:2016-06-06
Springboot項(xiàng)目異常處理及返回結(jié)果統(tǒng)一
這篇文章主要介紹了Springboot項(xiàng)目異常處理及返回結(jié)果統(tǒng)一,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的朋友可以參考一下2022-08-08
Java使用selenium爬取b站動(dòng)態(tài)的實(shí)現(xiàn)方式
本文主要介紹了Java使用selenium爬取b站動(dòng)態(tài)的實(shí)現(xiàn)方式,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01
IDEA在一個(gè)項(xiàng)目空間下管理多個(gè)項(xiàng)目的操作方法
這篇文章主要介紹了IDEA如何在一個(gè)項(xiàng)目空間下管理多個(gè)項(xiàng)目,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04

