java8 Future異步調(diào)用實(shí)現(xiàn)方式
一、同步與異步調(diào)用概念
- 同步API:調(diào)用方在調(diào)用某個(gè)方法后,等待被調(diào)用方返回結(jié)果;調(diào)用方在取得被調(diào)用方的返回值后,再繼續(xù)運(yùn)行。調(diào)用方順序執(zhí)行,同步等待被調(diào)用方的返回值,這就是阻塞式調(diào)用。
- 異步API:調(diào)用方在調(diào)用某個(gè)方法后,直接返回,不需要等待被調(diào)用方返回結(jié)果;被調(diào)用方開(kāi)啟一個(gè)線程處理任務(wù),調(diào)用方可以同時(shí)去處理其他工作。調(diào)用方和被調(diào)用方是異步的,這就是非阻塞式調(diào)用。
在Java種,F(xiàn)uture用來(lái)完成異步工作任務(wù),極大地提高了程序的運(yùn)行效率。
二、Future實(shí)現(xiàn)異步調(diào)用
2.1 future實(shí)現(xiàn)異步調(diào)用的方式
在Java8之前,直接使用Future以異步的方式執(zhí)行一個(gè)耗時(shí)的操作。通過(guò)這種編程方式,調(diào)用方線程使用ExecutorService,以并發(fā)方式調(diào)用另一個(gè)線程,在執(zhí)行耗時(shí)操作的同時(shí),去執(zhí)行一些其他的任務(wù)。
代碼示例:
package com.mvp.test;
import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureUseTest {
//創(chuàng)建Executor-Service,實(shí)現(xiàn)向線程池提交任務(wù)
ExecutorService executor = Executors.newCachedThreadPool();
@Test
public void futureTest() {
long start = System.nanoTime();
//向 Executor-Service 提交一個(gè)Callable 對(duì)象
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() {
//以異步方式在新的線程中執(zhí)行耗時(shí)的操作
return doSomeLongComputation(start);
}
});
//異步操作進(jìn)行的同時(shí),你可以做其他的事情
doSomethingElse(start);
Double result = 0.0;
try {
//獲取異步操作的結(jié)果,如果最終被阻塞,無(wú)法得到結(jié)果,那么在最多等待1秒鐘之后退出
result = future.get(2, TimeUnit.SECONDS);
// result = future.get();
} catch (ExecutionException ee) {
System.out.println("ExecutionException=" + Arrays.toString(ee.getStackTrace()));
// 計(jì)算拋出一個(gè)異常
} catch (InterruptedException ie) {
System.out.println("InterruptedException=" + Arrays.toString(ie.getStackTrace()));
// 當(dāng)前線程在等待過(guò)程中被中斷
} catch (TimeoutException te) {
System.out.println("TimeoutException=" + Arrays.toString(te.getStackTrace()));
// 在Future對(duì)象完成之前超過(guò)已過(guò)期
}
System.out.println("全部計(jì)算完成,耗時(shí):"+ (System.nanoTime() - start) / 1_000_000 + " msecs");
System.out.println("result=" + result);
}
public double doSomeLongComputation(Long start) {
delayLong();
System.out.println("異步執(zhí)行一個(gè)長(zhǎng)的計(jì)算,耗時(shí):" + (System.nanoTime() - start) / 1_000_000 + " msecs");
return 65000.00;
}
public void doSomethingElse(Long start) {
delay();
System.out.println("當(dāng)前線程做別的計(jì)算,耗時(shí):"+ (System.nanoTime() - start) / 1_000_000 + " msecs");
}
private void delay() {
try {
Thread.sleep( (long) (Math.random() * 1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void delayLong() {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}運(yùn)行結(jié)果為:
當(dāng)前線程做別的計(jì)算,耗時(shí):403 msecs
異步執(zhí)行一個(gè)長(zhǎng)的計(jì)算,耗時(shí):1506 msecs
全部計(jì)算完成,耗時(shí):1506 msecs
result=65000.0
2.2 使用CompletableFuture來(lái)實(shí)現(xiàn)異步調(diào)用
在java中,引入了CompletableFuture,更為方便地實(shí)現(xiàn)異步調(diào)用。
代碼示例為:
package com.mvp.test;
import org.junit.Test;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
public class CompletableFutureUsetTest {
private double calculateAsyncPrice(String product) {
delayLong();
double price = new Random().nextDouble()* 1000 + 150;
// System.out.println("calculate Price Of " + product + "is: " + price);
return price;
}
private double calculatePrice(String product) {
delay();
double price = new Random().nextDouble()* 1000 + 150;
// System.out.println("calculate Price Of " + product + "is: " + price);
return price;
}
public Future<Double> getPriceAsync(String product) {
// 創(chuàng)建 CompletableFuture對(duì)象,它會(huì)包含計(jì)算的結(jié)果
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
// 在另一個(gè)線程中以異步方式執(zhí)行計(jì)算
new Thread( () -> {
System.out.println("異步線程處理中");
try {
// 如果價(jià)格計(jì)算正常結(jié)束,完成Future操作,并設(shè)置商品價(jià)格
double price = calculateAsyncPrice(product);
// 設(shè)置Future的返回值,用以獲得需長(zhǎng)時(shí)間計(jì)算的任務(wù)的結(jié)果
futurePrice.complete(price);
} catch (Exception ex) {
// 若存在導(dǎo)致失敗的異常,則強(qiáng)制這次Future操作異常結(jié)束,并拋出Future完成異常
futurePrice.completeExceptionally(ex);
}
}).start();
// 無(wú)需等待,直接返回 Future 對(duì)象
return futurePrice;
}
public double getPriceDirect(Long start, String product) {
double price = calculatePrice(product);
System.out.println("當(dāng)前線程去查詢" + product + "的價(jià)格,耗時(shí):"+ (System.nanoTime() - start) / 1_000_000 + " msecs");
return price;
}
@Test
public void completableFutureTest() {
// 執(zhí)行異步任務(wù)
long startNanoTime = System.nanoTime();
Future<Double> futurePrice = getPriceAsync("籃球");
long returnFutureNanoTime = System.nanoTime();
long invocationTime = ((returnFutureNanoTime - startNanoTime) / 1_000_000);
System.out.println("調(diào)用getPriceAsyc方法直接返回,耗時(shí): " + invocationTime + " msecs");
// 執(zhí)行同步任務(wù)
double priceDirect = getPriceDirect(returnFutureNanoTime, "足球");
double priceAsync = 0.0;
try {
priceAsync = futurePrice.get();
// priceAsync = futurePrice.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
//throw new RuntimeException(e);
System.out.println("exception=" + e.toString());
}
System.out.printf("籃球和足球的總價(jià)格是: %.2f, futurePrice.get()耗時(shí)=%s msecs %n", priceAsync + priceDirect, (System.nanoTime() - returnFutureNanoTime) / 1_000_000);
long retrievalTime = ((System.nanoTime() - startNanoTime) / 1_000_000);
System.out.println("總耗時(shí):" + retrievalTime + " msecs");
}
private void delay() {
try {
//Thread.sleep( (long) (Math.random() * 1000));
Thread.sleep( 200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void delayLong() {
try {
Thread.sleep( 1500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}運(yùn)行結(jié)果為:
調(diào)用getPriceAsyc方法直接返回,耗時(shí): 199 msecs
異步線程處理中
當(dāng)前線程去查詢足球的價(jià)格,耗時(shí):201 msecs
籃球和足球的總價(jià)格是: 914.33, futurePrice.get()耗時(shí)=1500 msecs
總耗時(shí):1704 msecs
CompletableFuture類提供了大量精巧的工廠方法,使用這些方法能更容易地完成整個(gè)流程,不需擔(dān)心實(shí)現(xiàn)的細(xì)節(jié)。
例如,在采用supplyAsync方法后,可以用一行語(yǔ)句重寫(xiě)上例中的getPriceAsync方法,如下所示:
package com.mvp.test;
import org.junit.Test;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
public class CompletableFutureSupplyAsyncTest {
private double calculateAsyncPrice(String product) {
delayLong();
double price = new Random().nextDouble()* 1000 + 150;
//System.out.println("calculate Price Of " + product + "is: " + price);
return price;
}
private double calculatePrice(String product) {
delay();
double price = new Random().nextDouble()* 1000 + 150;
//System.out.println("calculate Price Of " + product + "is: " + price);
return price;
}
//使用工廠方法 supplyAsync 創(chuàng)建 CompletableFuture 對(duì)象
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculateAsyncPrice(product));
}
public double getPriceDirect(Long start, String product) {
double price = calculatePrice(product);
System.out.println("當(dāng)前線程去查詢" + product + "的價(jià)格, 耗時(shí):"+ (System.nanoTime() - start) / 1_000_000 + " msecs");
return price;
}
@Test
public void futureSupplyAsyncTest() {
// 執(zhí)行異步任務(wù)
long startNanoTime = System.nanoTime();
Future<Double> futurePrice = getPriceAsync("籃球");
long returnFutureNanoTime = System.nanoTime();
long invocationTime = ((returnFutureNanoTime - startNanoTime) / 1_000_000);
System.out.println("調(diào)用getPriceAsyc方法直接返回,耗時(shí): " + invocationTime + " msecs");
// 執(zhí)行同步任務(wù)
long startSyncNanoTime = System.nanoTime();
double priceDirect = getPriceDirect(startSyncNanoTime, "足球");
double priceAsync = 0.0;
try {
priceAsync = futurePrice.get();
// priceAsync = futurePrice.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
//throw new RuntimeException(e);
System.out.println("exception=" + e.toString());
}
System.out.printf("籃球和足球的總價(jià)格是: %.2f, futurePrice.get() 耗時(shí):%s msecs %n", priceAsync + priceDirect, (System.nanoTime() - returnFutureNanoTime) / 1_000_000);
long retrievalTime = ((System.nanoTime() - startNanoTime) / 1_000_000);
System.out.println("總耗時(shí):" + retrievalTime + " msecs");
}
private void delay() {
try {
//Thread.sleep( (long) (Math.random() * 1000));
Thread.sleep( 200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void delayLong() {
try {
Thread.sleep( 1500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}運(yùn)行結(jié)果為:
調(diào)用getPriceAsyc方法直接返回,耗時(shí): 176 msecs
當(dāng)前線程去查詢足球的價(jià)格, 耗時(shí):204 msecs
籃球和足球的總價(jià)格是: 997.99, futurePrice.get() 耗時(shí):1502 msecs
總耗時(shí):1681 msecs
supplyAsync方法接受一個(gè)生產(chǎn)者(Supplier)作為參數(shù),返回一個(gè)CompletableFuture對(duì)象(在完成異步執(zhí)行后,該對(duì)象會(huì)讀取異步方法的返回值)。
異步方法會(huì)交由ForkJoinPool池中的某個(gè)執(zhí)行器(Executor)運(yùn)行,也可以使用supplyAsync方法的重載版本,傳遞第2個(gè)參數(shù)指定不同的執(zhí)行器(Executor)執(zhí)行異步方法。
一般而言,向CompletableFuture的工廠方法傳遞可選參數(shù),指定異步方法的執(zhí)行器。
三、流順序執(zhí)行、并行、并發(fā)–異步執(zhí)行、并發(fā)–自定義異步執(zhí)行比較
對(duì)流順序執(zhí)行、并行、并發(fā)–異步執(zhí)行、并發(fā)–自定義異步執(zhí)行進(jìn)行比較,代碼如下:
package com.mvp.test;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
public class CompareParallelFutureUseTest {
List<String> shopNames = Arrays.asList("北京華聯(lián)", "華潤(rùn)", "沃爾瑪", "大潤(rùn)發(fā)", "萬(wàn)果園", "一峰");
private double calculatePrice(String product) {
double price = new Random().nextDouble()* 1000 + 150;
//System.out.println("calculate Price Of " + product + "is: " + price);
return price;
}
public double getPrice(String product) {
return calculatePrice(product);
}
/**
* 使用流順序計(jì)算
* @param product 商品名稱
* @return 列表
*/
public List<String> findPrices(String product) {
return shopNames.stream()
.map(shopName -> String.format("%s 價(jià)格: %.2f", shopName, getPrice(product)))
.collect(Collectors.toList());
}
/**
* 使用流并行計(jì)算
* @param product 商品名稱
* @return 列表
*/
public List<String> findPricesParallel(String product) {
return shopNames.parallelStream()
.map(shopName -> String.format("%s 價(jià)格: %.2f", shopName, getPrice(product)))
.collect(Collectors.toList());
}
/**
* 異步運(yùn)算
* @param product 商品名稱
* @return 列表
*/
public List<String> findPricesFuture(String product) {
List<CompletableFuture<String>> priceFutures = shopNames.stream()
.map(shopName -> CompletableFuture.supplyAsync(() -> String.format("%s 價(jià)格: %.2f", shopName, getPrice(product))))
.collect(Collectors.toList());
//CompletableFuture類中的join方法 和 Future接口中的get方法 有相同的含義
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
//創(chuàng)建一個(gè)線程池,其線程數(shù)目為100和商店數(shù)目二者中較小的一個(gè)值
private final Executor executor1 = Executors.newFixedThreadPool(Math.min(shopNames.size(), 10),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
// 使用守護(hù)線程。這種方式不會(huì)阻止程序的關(guān)停。
t.setDaemon(true);
return t;
}
});
/**
* 異步運(yùn)算:使用定制的執(zhí)行器(調(diào)整線程池的大小)
* @param product 商品名稱
* @return 列表
*/
public List<String> findPricesFutureCustom(String product) {
List<CompletableFuture<String>> priceFutures = shopNames.stream()
.map(shopName -> CompletableFuture.supplyAsync(() -> String.format("%s 價(jià)格: %.2f", shopName, getPrice(product)), executor1))
.collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
@Test
public void futureCompareTest() {
long start = System.nanoTime();
System.out.println(findPrices("羽毛球"));
System.out.println("使用流順序計(jì)算 Done in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
start = System.nanoTime();
System.out.println(findPricesParallel("羽毛球"));
System.out.println("使用流并行計(jì)算 Done in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
start = System.nanoTime();
System.out.println(findPricesFuture("羽毛球"));
System.out.println("并發(fā)Future異步運(yùn)算(默認(rèn)執(zhí)行器) Done in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
//并行和并發(fā)不相伯仲,究其原因都一樣:它們內(nèi)部采用的是同樣的通用線程池,默認(rèn)都使用固定數(shù)目的線程,具體線程數(shù)取決于
// Runtime.getRuntime().availableProcessors() 的返回值。
// 然而,CompletableFuture具有一定的優(yōu)勢(shì),因?yàn)樗试S你對(duì)執(zhí)行器(Executor)進(jìn)行配置,尤其是線程池的大小。
start = System.nanoTime();
System.out.println(findPricesFutureCustom("羽毛球"));
System.out.println("并發(fā)Future異步運(yùn)算(定制執(zhí)行器:調(diào)整線程池的大小) Done in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
}
}運(yùn)行結(jié)果為:
[北京華聯(lián) 價(jià)格: 552.91, 華潤(rùn) 價(jià)格: 173.53, 沃爾瑪 價(jià)格: 981.30, 大潤(rùn)發(fā) 價(jià)格: 339.54, 萬(wàn)果園 價(jià)格: 872.71, 一峰 價(jià)格: 338.87]
使用流順序計(jì)算 Done in 148 msecs
[北京華聯(lián) 價(jià)格: 475.23, 華潤(rùn) 價(jià)格: 991.62, 沃爾瑪 價(jià)格: 469.81, 大潤(rùn)發(fā) 價(jià)格: 1140.04, 萬(wàn)果園 價(jià)格: 199.57, 一峰 價(jià)格: 210.05]
使用流并行計(jì)算 Done in 5 msecs
[北京華聯(lián) 價(jià)格: 723.78, 華潤(rùn) 價(jià)格: 546.76, 沃爾瑪 價(jià)格: 979.16, 大潤(rùn)發(fā) 價(jià)格: 402.02, 萬(wàn)果園 價(jià)格: 770.86, 一峰 價(jià)格: 601.99]
并發(fā)Future異步運(yùn)算(默認(rèn)執(zhí)行器) Done in 5 msecs
[北京華聯(lián) 價(jià)格: 854.24, 華潤(rùn) 價(jià)格: 1000.75, 沃爾瑪 價(jià)格: 1103.58, 大潤(rùn)發(fā) 價(jià)格: 355.49, 萬(wàn)果園 價(jià)格: 849.84, 一峰 價(jià)格: 1051.99]
并發(fā)Future異步運(yùn)算(定制執(zhí)行器:調(diào)整線程池的大小) Done in 4 msecs
從運(yùn)行結(jié)果可以看出,流并行計(jì)算、異步運(yùn)算、自定義執(zhí)行器異步運(yùn)算的效率比流順序計(jì)算要高很多。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring AOP如何整合redis(注解方式)實(shí)現(xiàn)緩存統(tǒng)一管理詳解
這篇文章主要給大家介紹了關(guān)于Spring AOP如何整合redis(注解方式)實(shí)現(xiàn)緩存統(tǒng)一管理的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-08-08
SpringBoot+Redis防止接口重復(fù)提交問(wèn)題
這篇文章主要介紹了SpringBoot+Redis防止接口重復(fù)提交問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-06-06
java JTree JCheckBox樹(shù)復(fù)選框詳解
這篇文章主要為大家詳細(xì)介紹了java JTree JCheckBox樹(shù)復(fù)選框的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-11-11
Java?中很好用的數(shù)據(jù)結(jié)構(gòu)EnumSet
這篇文章主要介紹了Java?中很好用的數(shù)據(jù)結(jié)構(gòu)EnumSet,EnumMap即屬于一個(gè)Map,下文圍繞主題展開(kāi)詳細(xì)內(nèi)容,需要的小伙伴可以參考參考一下2022-05-05
Java中使用HttpPost發(fā)送form格式的請(qǐng)求實(shí)現(xiàn)代碼
在Java中使用HttpPost發(fā)送form格式的請(qǐng)求,可以使用Apache HttpClient庫(kù)來(lái)實(shí)現(xiàn),這篇文章主要介紹了Java中使用HttpPost發(fā)送form格式的請(qǐng)求,本文給大家展示示例代碼,需要的朋友可以參考下2023-08-08

