Java 中的 CompletableFuture如何讓異步編程變得簡單
大家在開發(fā)中有沒有遇到過這樣的場景:需要同時調(diào)用好幾個接口,等所有接口都返回結(jié)果后再進行下一步處理?或者某個操作依賴另一個操作的結(jié)果,但又不想讓程序一直等著?
如果還用傳統(tǒng)的多線程或者 Future 來處理,代碼往往寫得又復(fù)雜又難維護。今天就來給大家介紹一個 Java 8 引入的異步編程神器 ——CompletableFuture,它能讓這些復(fù)雜的異步操作變得簡單優(yōu)雅。
一、什么是 CompletableFuture?
簡單說,CompletableFuture 是一個「可以手動完成的 Future」。它不僅能像普通 Future 那樣執(zhí)行異步任務(wù),還提供了一堆實用的方法,讓我們可以輕松實現(xiàn):
- 鏈式調(diào)用(上一個任務(wù)完成后自動執(zhí)行下一個)
- 組合多個任務(wù)(不管是有依賴關(guān)系還是完全獨立)
- 優(yōu)雅處理異常(不用擔(dān)心異步任務(wù)的異常被悄悄吃掉)
形象點說,普通 Future 就像寄快遞,只能等快遞到了自己去取;而 CompletableFuture 更像「快遞上門」,不僅能自動通知你,還能幫你把快遞拆開、分類,甚至直接送到指定位置。
CompletableFuture 同時實現(xiàn)了 Future 和 CompletionStage 接口
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}

二、入門:創(chuàng)建第一個 CompletableFuture
創(chuàng)建 CompletableFuture 主要靠兩個靜態(tài)方法,先看個簡單例子:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class FirstCompletableFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 有返回值的異步任務(wù):supplyAsync
CompletableFuture<String> foodFuture = CompletableFuture.supplyAsync(() -> {
// 模擬耗時操作(比如調(diào)用接口查外賣)
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
return "麻辣燙"; // 任務(wù)結(jié)果
});
// 2. 無返回值的異步任務(wù):runAsync
CompletableFuture<Void> noticeFuture = CompletableFuture.runAsync(() -> {
// 模擬耗時操作(比如發(fā)送通知)
try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("已通知用戶:外賣正在配送中");
});
// 獲取第一個任務(wù)的結(jié)果(會等任務(wù)完成)
String food = foodFuture.get();
System.out.println("用戶點的是:" + food);
// 等待第二個任務(wù)完成(雖然它沒返回值,但我們需要它執(zhí)行完)
noticeFuture.get();
}
}運行結(jié)果:

關(guān)鍵點:
1.supplyAsync(帶Async表示是異步):適合有返回結(jié)果的任務(wù)(比如查數(shù)據(jù)、算結(jié)果),參數(shù)是一個 Supplier(帶返回值的函數(shù))。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
//使用自定義線程池,比較推薦
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}2.runAsync:適合無返回結(jié)果的任務(wù)(比如發(fā)日志、發(fā)通知),參數(shù)是一個 Runnable(無返回值的函數(shù))。
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
//使用自定義線程池,比較推薦
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}3.兩個任務(wù)是并行執(zhí)行的,所以通知先完成(只等了 500ms),外賣查詢后完成(等了 1000ms)。
三、進階:鏈式操作,讓任務(wù)像流水線一樣執(zhí)行
最能體現(xiàn) CompletableFuture 強大的,就是它的鏈式操作。不用手動等待上一個任務(wù)完成,直接指定「下一個要做什么」。
比如我們要完成這樣一個流程:
- 查用戶 ID(耗時 1s)
- 用 ID 查用戶名(耗時 0.5s)
- 打印用戶名(耗時忽略)
用鏈式操作實現(xiàn),代碼會非常清爽:
import java.util.concurrent.CompletableFuture;
public class CompletableFutureChain {
public static void main(String[] args) throws Exception {
// 鏈式操作:一步接一步執(zhí)行
CompletableFuture<Void> pipeline = CompletableFuture.supplyAsync(() -> {
// 第一步:查用戶ID
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("第一步:查到用戶ID = 10086");
return 10086; // 把結(jié)果傳給下一步
}).thenApply(userId -> {
// 第二步:用ID查用戶名(接收上一步的結(jié)果)
try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("第二步:用ID " + userId + " 查到用戶名 = 小明");
return "小明"; // 再把結(jié)果傳給下一步
}).thenAccept(username -> {
// 第三步:打印用戶名(接收上一步的結(jié)果,無返回值)
System.out.println("第三步:最終用戶名是 " + username);
});
// 等待整個流水線完成
pipeline.get();
}
}運行結(jié)果:

常用鏈式方法:
thenApply:接收上一步結(jié)果,處理后返回新結(jié)果(比如「ID→用戶名」的轉(zhuǎn)換)。thenAccept:接收上一步結(jié)果,只處理不返回(比如打印、保存)。thenRun:不關(guān)心上一步結(jié)果,只在完成后執(zhí)行(比如「不管結(jié)果如何,都記錄日志」)。
就像流水線一樣,上一個工序的產(chǎn)品自動傳到下一個工序,全程無需人工干預(yù)。
四、高手篇:組合多個任務(wù),效率翻倍
實際開發(fā)中,我們經(jīng)常需要處理多個任務(wù),有的任務(wù)之間有依賴(比如先登錄才能下單),有的則完全獨立(比如同時加載商品信息和用戶信息)。CompletableFuture 提供了專門的方法來處理這些場景。
1. 處理依賴任務(wù):thenCompose
比如「先查用戶地址,再根據(jù)地址查天氣」,第二個任務(wù)依賴第一個的結(jié)果:
import java.util.concurrent.CompletableFuture;
public class ThenComposeDemo {
// 模擬:查用戶地址
public static CompletableFuture<String> getAddress(String username) {
return CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(800); } catch (InterruptedException e) { e.printStackTrace(); }
return username + "的地址是:北京市海淀區(qū)";
});
}
// 模擬:根據(jù)地址查天氣
public static CompletableFuture<String> getWeather(String address) {
return CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); }
return address + ",今天天氣:晴,25℃";
});
}
public static void main(String[] args) throws Exception {
// 組合兩個依賴任務(wù)
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> "小明")
.thenCompose(username -> getAddress(username)) // 先查地址
.thenCompose(address -> getWeather(address)); // 再查天氣
System.out.println(result.get());
}
}運行流程如下:

運行結(jié)果:

thenCompose 就像「接力賽」,第一棒跑完了,把接力棒交給第二棒,確保任務(wù)按順序執(zhí)行。
2. 處理獨立任務(wù):thenCombine
如果兩個任務(wù)毫無關(guān)系,可以并行執(zhí)行,最后合并結(jié)果。比如「同時查商品價格和庫存,計算總價」:
import java.util.concurrent.CompletableFuture;
public class ThenCombineDemo {
// 查價格
public static CompletableFuture<Double> getPrice(String product) {
return CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(product + "的價格是:99.9元");
return 99.9;
});
}
// 查庫存
public static CompletableFuture<Integer> getStock(String product) {
return CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(800); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(product + "的庫存是:5件");
return 5;
});
}
public static void main(String[] args) throws Exception {
String product = "Java編程思想";
// 并行執(zhí)行兩個任務(wù),然后合并結(jié)果
CompletableFuture<Double> total = getPrice(product)
.thenCombine(getStock(product), (price, stock) -> price * stock);
System.out.println("總價:" + total.get() + "元");
}
}執(zhí)行流程:

執(zhí)行結(jié)果:

兩個任務(wù)并行執(zhí)行,效率大大提高!
3. 等待所有任務(wù):allOf
如果有一堆任務(wù),需要全部完成后再做處理(比如批量下載多個文件,全部下完后打包):
package com.itheima.future;
import java.util.concurrent.CompletableFuture;
public class AllOfDemo {
public static void main(String[] args) throws Exception {
//計時開始
long start = System.currentTimeMillis();
// 3個下載任務(wù)
CompletableFuture<Void> download1 = CompletableFuture.runAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("文件1下載完成");
});
CompletableFuture<Void> download2 = CompletableFuture.runAsync(() -> {
try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("文件2下載完成");
});
CompletableFuture<Void> download3 = CompletableFuture.runAsync(() -> {
try { Thread.sleep(800); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("文件3下載完成");
});
// 等待所有任務(wù)完成
CompletableFuture.allOf(download1, download2, download3).get();
System.out.println("所有文件下載完成,開始打包...");
System.out.println("打包完成,耗時:" + (System.currentTimeMillis() - start) + "毫秒");
}
}執(zhí)行流程如下:

運行結(jié)果:

4. 等待任意任務(wù):anyOf
如果多個任務(wù)中,只要有一個完成就可以繼續(xù)(比如查多個數(shù)據(jù)源,哪個快用哪個):
import java.util.concurrent.CompletableFuture;
public class AnyOfDemo {
public static void main(String[] args) throws Exception {
// 3個查詢?nèi)蝿?wù)(不同數(shù)據(jù)源)
CompletableFuture<String> fromCache = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
return "從緩存查到數(shù)據(jù):Java入門";
});
CompletableFuture<String> fromDb = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(800); } catch (InterruptedException e) { e.printStackTrace(); }
return "從數(shù)據(jù)庫查到數(shù)據(jù):Java入門";
});
CompletableFuture<String> fromApi = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1200); } catch (InterruptedException e) { e.printStackTrace(); }
return "從API查到數(shù)據(jù):Java入門";
});
// 只要有一個任務(wù)完成就返回
CompletableFuture<Object> result = CompletableFuture.anyOf(fromCache, fromDb, fromApi);
System.out.println("最快的結(jié)果:" + result.get());
}
}運行結(jié)果:

五、異常處理:別讓異步任務(wù)的錯誤悄悄溜走
異步任務(wù)的異常很容易被忽略(比如線程池悄悄吃掉異常),CompletableFuture 提供了貼心的異常處理方法。
1. exceptionally:捕獲異常并返回默認值
import java.util.concurrent.CompletableFuture;
public class ExceptionDemo1 {
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 模擬任務(wù)失敗
if (true) {
throw new RuntimeException("查詢失?。簲?shù)據(jù)庫連接超時");
}
return 100;
}).exceptionally(ex -> {
// 捕獲異常,返回默認值
System.out.println("出錯了:" + ex.getMessage());
return 0; // 默認值
});
System.out.println("最終結(jié)果:" + future.get()); // 輸出 0
}
}執(zhí)行結(jié)果:

2. handle:同時處理正常結(jié)果和異常
import java.util.concurrent.CompletableFuture;
public class ExceptionDemo2 {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 這里可以故意拋出異常測試
return "正常結(jié)果";
}).handle((result, ex) -> {
if (ex != null) {
return "處理異常:" + ex.getMessage();
} else {
return "處理成功:" + result;
}
});
System.out.println(future.get()); // 輸出 處理成功:正常結(jié)果
}
}執(zhí)行結(jié)果:

六、CompletableFuture 的線程池:默認還是自定義?
使用 CompletableFuture 時,線程池的選擇非常關(guān)鍵,它直接影響程序的性能和穩(wěn)定性。
1. 默認線程池:ForkJoinPool.commonPool ()
當(dāng)我們使用無參的supplyAsync()或runAsync()時,CompletableFuture 會默認使用ForkJoinPool.commonPool()作為線程池:
// 使用默認線程池
CompletableFuture.supplyAsync(() -> {
// 任務(wù)邏輯
return "result";
});默認線程池的特點:
- 線程數(shù)量:默認等于 CPU 核心數(shù)(可以通過
-Djava.util.concurrent.ForkJoinPool.common.parallelism參數(shù)調(diào)整)- 適用場景:CPU 密集型任務(wù)(如計算)
- 優(yōu)點:無需手動管理線程池,簡單方便
潛在問題:
- 所有使用默認線程池的任務(wù)會共享這一組線程,高并發(fā)下可能出現(xiàn)資源競爭
- 對于 IO 密集型任務(wù)(如網(wǎng)絡(luò)請求、文件讀寫),固定的線程數(shù)可能導(dǎo)致效率低下
- 當(dāng)有大量任務(wù)時,可能會拖慢所有依賴此線程池的任務(wù)
2. 自定義線程池:更靈活的控制
實際項目中,強烈建議使用自定義線程池,尤其是在生產(chǎn)環(huán)境。我們可以通過帶線程池參數(shù)的方法來指定:
package com.itheima.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class CustomThreadPoolDemo {
// 自定義線程池(這里我暫時使用線程池包里的線程池,一般情況下要使用自定義線程池ThreadPoolExecutor)
private static final ExecutorService customExecutor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
// 使用自定義線程池
CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)在自定義線程池執(zhí)行:" + Thread.currentThread().getName());
return "處理完成";
}, customExecutor)
.thenAccept(result -> {
System.out.println("結(jié)果:" + result);
});
// 記得在程序結(jié)束時關(guān)閉線程池
customExecutor.shutdown();
}
}執(zhí)行結(jié)果:

自定義線程池的優(yōu)勢:
- 隔離不同類型的任務(wù)(例如:查詢數(shù)據(jù)庫的任務(wù)用一個線程池,發(fā)送消息的任務(wù)用另一個)
- 可以根據(jù)任務(wù)類型(CPU 密集 / IO 密集)調(diào)整線程數(shù)量
- 避免默認線程池被某個耗時任務(wù)占滿導(dǎo)致的整體阻塞
線程池配置建議:
- CPU 密集型任務(wù):線程數(shù) = CPU 核心數(shù) + 1
- IO 密集型任務(wù):線程數(shù) = CPU 核心數(shù) × 2(或更多,根據(jù)實際測試調(diào)整)
- 為線程池起一個有意義的名字,方便問題排查(可以通過自定義 ThreadFactory 實現(xiàn))
七、CompletableFuture vs Future:到底有什么不同?
很多人會疑惑,Java 已經(jīng)有了 Future,為什么還需要 CompletableFuture?它們的核心區(qū)別在哪里?
| 特性 | Future | CompletableFuture |
|---|---|---|
| 實現(xiàn)接口 | 僅實現(xiàn) Future 接口 | 實現(xiàn) Future 和 CompletionStage 接口 |
| 鏈式操作 | 不支持,必須阻塞獲取結(jié)果后再處理 | 支持,可通過 thenApply 等方法串聯(lián)多個任務(wù) |
| 異常處理 | 無專門的 API,需要在任務(wù)內(nèi)部捕獲 | 提供 exceptionally、handle 等專門的異常處理方法 |
| 任務(wù)組合 | 不支持,需要手動編寫同步邏輯 | 支持 thenCompose、thenCombine 等多種組合方式 |
| 手動完成 | 不支持 | 支持 complete ()、completeExceptionally () 手動設(shè)置結(jié)果或異常 |
| 阻塞獲取 | 只能通過 get () 阻塞獲取 | 可以阻塞獲取,也可以通過回調(diào)非阻塞處理 |
具體區(qū)別舉例
1. 處理結(jié)果的方式
Future 的方式(繁瑣且必須阻塞):
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "任務(wù)結(jié)果";
});
// 必須阻塞等待結(jié)果
String result = future.get();
// 處理結(jié)果
System.out.println("處理:" + result);
executor.shutdown();CompletableFuture 的方式(非阻塞,鏈式處理):
CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
return "任務(wù)結(jié)果";
}).thenApply(result -> {
return "處理后:" + result;
}).thenAccept(processedResult -> {
System.out.println(processedResult);
});2. 異常處理能力
Future 的方式(異常處理麻煩):
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(() -> {
if (true) {
throw new RuntimeException("計算失敗");
}
return 100;
});
try {
Integer result = future.get(); // 異常會在這里拋出
} catch (Exception e) {
// 處理異常
e.printStackTrace();
}
executor.shutdown();CompletableFuture 的方式(專門的異常處理 API):
CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("計算失敗");
}
return 100;
}).exceptionally(ex -> {
System.out.println("捕獲異常:" + ex.getMessage());
return 0; // 返回默認值
}).thenAccept(result -> {
System.out.println("結(jié)果:" + result); // 輸出0
});3. 多任務(wù)組合能力
Future 幾乎無法優(yōu)雅地組合多個任務(wù),而 CompletableFuture 提供了豐富的組合方式,這也是它最核心的優(yōu)勢。
到此這篇關(guān)于Java 中的 CompletableFuture:讓異步編程變得簡單的文章就介紹到這了,更多相關(guān)Java CompletableFuture異步編程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java使用CompletableFuture實現(xiàn)異步編程
- Java8 CompletableFuture異步編程解讀
- Java異步線程中的CompletableFuture與@Async詳解
- Java中的CompletableFuture異步編程詳解
- java中CompletableFuture異步執(zhí)行方法
- Java8?CompletableFuture?異步多線程的實現(xiàn)
- java CompletableFuture實現(xiàn)異步編排詳解
- java?CompletableFuture異步任務(wù)編排示例詳解
- Java?CompletableFuture實現(xiàn)多線程異步編排
相關(guān)文章
Spring Boot @RestControllerAdvice全局異常處理最佳實踐
本文詳解SpringBoot中通過@RestControllerAdvice實現(xiàn)全局異常處理,強調(diào)代碼復(fù)用、統(tǒng)一響應(yīng)格式、異常分類處理及減少冗余代碼的重要性,涵蓋注解解析、實戰(zhàn)代碼、最佳實踐,感興趣的朋友一起看看吧2025-07-07
kafka 重新分配partition和調(diào)整replica的數(shù)量實現(xiàn)
當(dāng)需要提升Kafka集群的性能和負載均衡時,可通過kafka-reassign-partitions.sh命令手動重新分配Partition,增加節(jié)點后,可以將Topic的Partition的Leader節(jié)點均勻分布,以提高寫入和消費速度,感興趣的可以了解一下2022-03-03
Java Builder Pattern建造者模式詳解及實例
這篇文章主要介紹了Java Builder Pattern建造者模式詳解及實例的相關(guān)資料,需要的朋友可以參考下2017-01-01
Java實現(xiàn)實時監(jiān)控目錄下文件變化的方法
今天小編就為大家分享一篇關(guān)于Java實現(xiàn)實時監(jiān)控目錄下文件變化的方法,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-03-03
解決SecureRandom.getInstanceStrong()引發(fā)的線程阻塞問題
這篇文章主要介紹了解決SecureRandom.getInstanceStrong()引發(fā)的線程阻塞問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
SpringCloud OpenFeign與Ribbon客戶端配置詳解
在springcloud中,openfeign是取代了feign作為負載均衡組件的,feign最早是netflix提供的,他是一個輕量級的支持RESTful的http服務(wù)調(diào)用框架,內(nèi)置了ribbon,而ribbon可以提供負載均衡機制,因此feign可以作為一個負載均衡的遠程服務(wù)調(diào)用框架使用2022-11-11
Spring Boot中自定義注解結(jié)合AOP實現(xiàn)主備庫切換問題
這篇文章主要介紹了Spring Boot中自定義注解+AOP實現(xiàn)主備庫切換的相關(guān)知識,本篇文章的場景是做調(diào)度中心和監(jiān)控中心時的需求,后端使用TDDL實現(xiàn)分表分庫,需要的朋友可以參考下2019-08-08
基于SpringBoot+Redis的Session共享與單點登錄詳解
這篇文章主要介紹了基于SpringBoot+Redis的Session共享與單點登錄,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-07-07

