Java8 自定義CompletableFuture的原理解析
Java8 自定義CompletableFuture原理
Future 接口 的局限性有很多,其中一個(gè)就是需要主動(dòng)的去詢問(wèn)是否完成,如果等子線程的任務(wù)完成以后,通知我,那豈不是更好?
public class FutureInAction3 {
public static void main(String[] args) {
Future<String> future = invoke(() -> {
try {
Thread.sleep(10000L);
return "I am Finished.";
} catch (InterruptedException e) {
return "I am Error";
}
});
future.setCompletable(new Completable<String>() {
@Override
public void complete(String s) {
System.out.println("complete called ---- " + s);
}
@Override
public void exception(Throwable cause) {
System.out.println("error");
cause.printStackTrace();
}
});
System.out.println("....do something else .....");
System.out.println("try to get result ->" + future.get());
}
private static <T> Future<T> invoke(Callable<T> callable) {
AtomicReference<T> result = new AtomicReference<>();
AtomicBoolean finished = new AtomicBoolean(false);
Future<T> future = new Future<T>() {
private Completable<T> completable;
@Override
public T get() {
return result.get();
}
@Override
public boolean isDone() {
return finished.get();
}
// 設(shè)置完成
@Override
public void setCompletable(Completable<T> completable) {
this.completable = completable;
}
// 獲取
@Override
public Completable<T> getCompletable() {
return completable;
}
};
Thread t = new Thread(() -> {
try {
T value = callable.action();
result.set(value);
finished.set(true);
if (future.getCompletable() != null)
future.getCompletable().complete(value);
} catch (Throwable cause) {
if (future.getCompletable() != null)
future.getCompletable().exception(cause);
}
});
t.start();
return future;
}
private interface Future<T> {
T get();
boolean isDone();
// 1
void setCompletable(Completable<T> completable);
// 2
Completable<T> getCompletable();
}
private interface Callable<T> {
T action();
}
// 回調(diào)接口
private interface Completable<T> {
void complete(T t);
void exception(Throwable cause);
}
}

CompleteFuture簡(jiǎn)單使用
Java8 中的 completeFuture 是對(duì) Future 的擴(kuò)展實(shí)現(xiàn), 主要是為了彌補(bǔ) Future 沒有相應(yīng)的回調(diào)機(jī)制的缺陷.
我們先看看 Java8 之前的 Future 的使用
package demos;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author djh on 2019/4/22 10:23
* @E-Mail 1544579459@qq.com
*/
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService cachePool = Executors.newCachedThreadPool();
Future<String> future = cachePool.submit(() -> {
Thread.sleep(3000);
return "異步任務(wù)計(jì)算結(jié)果!";
});
// 提交完異步任務(wù)后, 主線程可以繼續(xù)干一些其他的事情.
doSomeThingElse();
// 為了獲取異步計(jì)算結(jié)果, 我們可以通過(guò) future.get 和 輪詢機(jī)制來(lái)獲取.
String result;
// Get 方式會(huì)導(dǎo)致當(dāng)前線程阻塞, 這顯然違背了異步計(jì)算的初衷.
// result = future.get();
// 輪詢方式雖然不會(huì)導(dǎo)致當(dāng)前線程阻塞, 但是會(huì)導(dǎo)致高額的 CPU 負(fù)載.
long start = System.currentTimeMillis();
while (true) {
if (future.isDone()) {
break;
}
}
System.out.println("輪詢耗時(shí):" + (System.currentTimeMillis() - start));
result = future.get();
System.out.println("獲取到異步計(jì)算結(jié)果啦: " + result);
cachePool.shutdown();
}
private static void doSomeThingElse() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我的最重要的事情干完了, 我要獲取異步計(jì)算結(jié)果來(lái)執(zhí)行剩下的事情.");
}
}
輸出:
我的最重要的事情干完了, 我要獲取異步計(jì)算結(jié)果來(lái)執(zhí)行剩下的事情.
輪詢耗時(shí):2000
獲取到異步計(jì)算結(jié)果啦: 異步任務(wù)計(jì)算結(jié)果!Process finished with exit code 0
從上面的 Demo 中我們可以看出, future 在執(zhí)行異步任務(wù)時(shí), 對(duì)于結(jié)果的獲取顯的不那么優(yōu)雅, 很多第三方庫(kù)就針對(duì) Future 提供了回調(diào)式的接口以用來(lái)獲取異步計(jì)算結(jié)果, 如Google的: ListenableFuture, 而 Java8 所提供的 CompleteFuture 便是官方為了彌補(bǔ)這方面的不足而提供的 API.
下面簡(jiǎn)單介紹用法
package demos;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author djh on 2019/5/1 20:26
* @E-Mail 1544579459@qq.com
*/
public class CompleteFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFutureOne = new CompletableFuture<>();
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(() -> {
try {
Thread.sleep(3000);
completableFutureOne.complete("異步任務(wù)執(zhí)行結(jié)果");
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// WhenComplete 方法返回的 CompletableFuture 仍然是原來(lái)的 CompletableFuture 計(jì)算結(jié)果.
CompletableFuture<String> completableFutureTwo = completableFutureOne.whenComplete((s, throwable) -> {
System.out.println("當(dāng)異步任務(wù)執(zhí)行完畢時(shí)打印異步任務(wù)的執(zhí)行結(jié)果: " + s);
});
// ThenApply 方法返回的是一個(gè)新的 completeFuture.
CompletableFuture<Integer> completableFutureThree = completableFutureTwo.thenApply(s -> {
System.out.println("當(dāng)異步任務(wù)執(zhí)行結(jié)束時(shí), 根據(jù)上一次的異步任務(wù)結(jié)果, 繼續(xù)開始一個(gè)新的異步任務(wù)!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.length();
});
System.out.println("阻塞方式獲取執(zhí)行結(jié)果:" + completableFutureThree.get());
cachePool.shutdown();
}
}
從上面的 Demo 中我們主要需要注意 thenApply 和 whenComplete 這兩個(gè)方法, 這兩個(gè)方法便是 CompleteFuture 中最具有意義的方法, 他們都會(huì)在 completeFuture 調(diào)用 complete 方法傳入異步計(jì)算結(jié)果時(shí)回調(diào), 從而獲取到異步任務(wù)的結(jié)果.
相比之下 future 的阻塞和輪詢方式獲取異步任務(wù)的計(jì)算結(jié)果, CompleteFuture 獲取結(jié)果的方式就顯的優(yōu)雅的多。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- Java8?CompletableFuture?runAsync學(xué)習(xí)總結(jié)submit()?execute()等
- Java?CompletableFuture實(shí)現(xiàn)多線程異步編排
- 詳解Java8?CompletableFuture的并行處理用法
- Java8 使用工廠方法supplyAsync創(chuàng)建CompletableFuture實(shí)例
- Java8 CompletableFuture 異步執(zhí)行操作
- Java并發(fā) CompletableFuture異步編程的實(shí)現(xiàn)
- Java8新的異步編程方式CompletableFuture實(shí)現(xiàn)
- Java8 CompletableFuture詳解
- Java中的CompletableFuture原理與用法
相關(guān)文章
Java Spring5學(xué)習(xí)之JdbcTemplate詳解
這篇文章主要介紹了Java Spring5學(xué)習(xí)之JdbcTemplate詳解,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-05-05
Mybatis-Plus實(shí)現(xiàn)用戶ID自增出現(xiàn)的問(wèn)題解決
項(xiàng)目基于 SpringBoot + MybatisPlus 3.5.2 使用數(shù)據(jù)庫(kù)自增ID時(shí), 出現(xiàn)重復(fù)鍵的問(wèn)題,本文就來(lái)介紹一下解決方法,感興趣的可以了解一下2023-09-09
Java中parallelStream().forEach()的踩坑日記
本文主要介紹了Java中parallelStream().forEach()的踩坑日記,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06
nacos服務(wù)注冊(cè)服務(wù)發(fā)現(xiàn)依賴配置詳解
這篇文章主要為大家介紹了nacos服務(wù)注冊(cè)服務(wù)發(fā)現(xiàn)依賴配置詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09
java使用UDP實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)通信
這篇文章主要為大家詳細(xì)介紹了java使用UDP實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)通信,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-06-06
解決spirngboot連接redis報(bào)錯(cuò):READONLY?You?can‘t?write?against?
docker部署的redis,springboot基本每天來(lái)連redis都報(bào)錯(cuò):READONLY?You?can't?write?against?a?read?only?replica,重啟redis后,可以正常連接。但是每天都重啟redis,不現(xiàn)實(shí),也很麻煩,今天給大家分享解決方式,感興趣的朋友一起看看吧2023-06-06
Java8函數(shù)式接口UnaryOperator用法示例
這篇文章主要介紹了Java8函數(shù)式接口UnaryOperator用法示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07
Java接口默認(rèn)方法帶來(lái)的問(wèn)題分析【二義性問(wèn)題】
這篇文章主要介紹了Java接口默認(rèn)方法帶來(lái)的問(wèn)題,結(jié)合實(shí)例形式分析了java接口帶來(lái)的二義性問(wèn)題,需要的朋友可以參考下2019-08-08

