Java開發(fā)異步編程中常用的接口和類示例詳解
Callable 與 Runnable接口
- Callable接口類似于Runnable接口,都是用于實現(xiàn)一個線程任務。
- Callable實現(xiàn)的線程可以返回執(zhí)行結(jié)果,但是Runnable不可以返回執(zhí)行結(jié)果。
- Callable的run方法會拋出編譯時異常,而Runnable沒有拋出異常。
- 可以使用工廠類Executors把Runnable包裝成Callable。
Runnable使用方式一:使用Thread運行
public class ThreadCreationDemo {
// 第一種方式需要一個實現(xiàn)了Runnable接口的類
static class MyRunnable implements Runnable {
@Override
public void run() {
// 打印當前執(zhí)行此代碼的線程的名稱
System.out.println("方法一 (MyRunnable): 線程 " + Thread.currentThread().getName() + " 正在運行。");
}
}
public static void main(String[] args) {
// 第一種方式: 使用一個獨立的類去實現(xiàn)Runnable接口
// 這種方法結(jié)構清晰,適用于邏輯比較復雜的場景。
Thread t1 = new Thread(new MyRunnable());
t1.start();
// 第二種方式: 使用Lambda表達式實現(xiàn)一個Runnable
// 這是Java 8+中最推薦的簡潔寫法。
Thread t2 = new Thread(() -> {
System.out.println("方法二 (Lambda): 線程 " + Thread.currentThread().getName() + " 正在運行。");
});
t2.start();
// 第三種方式: 使用匿名內(nèi)部類實現(xiàn)一個Runnable
// 這是在Java 8出現(xiàn)之前常用的方法。
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("方法三 (匿名內(nèi)部類): 線程 " + Thread.currentThread().getName() + " 正在運行。");
}
});
t3.start();
}
}
Runnable使用方式二:使用線程池執(zhí)行
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorServiceDemo {
// 定義一個實現(xiàn)了Runnable接口的類,用于作為要執(zhí)行的任務
static class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("任務由線程: " + Thread.currentThread().getName() + " 執(zhí)行。");
try {
// 模擬任務執(zhí)行耗時
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
// 第二種方式: 使用“線程池”去運行Runnable
// 使用Executors工廠類創(chuàng)建一個單線程的線程池
// 這個線程池保證任務是按提交順序依次執(zhí)行的
ExecutorService executorService = Executors.newSingleThreadExecutor();
System.out.println("使用 executorService.execute() 提交任務。");
// execute(Runnable): 提交一個任務,沒有返回值。
executorService.execute(new MyRunnable());
System.out.println("使用 executorService.submit() 提交任務。");
// submit(Runnable): 同樣提交一個任務,但會返回一個Future對象。
// 這個Future對象可以用來判斷任務是否執(zhí)行完畢.
Future<?> future = executorService.submit(new MyRunnable());
// --- 把Runnable通過工具類轉(zhuǎn)成Callable ---
// Callable與Runnable類似,但它可以有返回值并能拋出異常。
// Executors.callable()方法可以將一個Runnable轉(zhuǎn)換為Callable,
// 其執(zhí)行結(jié)果為null。
Callable<Object> callable = Executors.callable(new MyRunnable());
System.out.println("提交轉(zhuǎn)換后的Callable任務。");
Future<Object> callableFuture = executorService.submit(callable);
// --- 關閉線程池 ---
executorService.shutdown();
}
}
Callable使用方式一:包裝到FutureTask里面,然后通過Thread運行
因為FutureTask實現(xiàn)了Runnable接口,所以可以作為參數(shù)傳入Thread中
一般寫法
代碼示例:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 創(chuàng)建一個有返回值的任務 (Callable)
Callable<String> myTask = () -> {
System.out.println("子線程: 正在執(zhí)行任務...");
Thread.sleep(2000); // 模擬耗時2秒
return "任務執(zhí)行完畢,這是結(jié)果";
};
// 2. 將任務包裝成 FutureTask (如圖所示)
FutureTask<String> futureTask = new FutureTask<>(myTask);
// 3. 把 FutureTask 交給一個新線程去執(zhí)行 (如圖所示)
// 因為 FutureTask 實現(xiàn)了 Runnable 接口,所以 Thread 可以直接運行它
Thread thread = new Thread(futureTask);
thread.start();
System.out.println("主線程: 已啟動子線程,我先忙別的...");
// 4. 在主線程中獲取任務結(jié)果
// get() 方法會阻塞當前線程,直到子線程的任務完成
System.out.println("主線程: 現(xiàn)在需要結(jié)果了,開始等待...");
String result = futureTask.get(); // 等待并獲取結(jié)果
System.out.println("主線程: 終于拿到了結(jié)果 - \"" + result + "\"");
}
}
運行結(jié)果:
主線程: 已啟動子線程,我先忙別的...
子線程: 正在執(zhí)行任務...
主線程: 現(xiàn)在需要結(jié)果了,開始等待...
(程序在這里會暫停2秒)
主線程: 終于拿到了結(jié)果 - "任務執(zhí)行完畢,這是結(jié)果"
也可以用匿名內(nèi)部類和Lambda表達式來定義
Callable任務
匿名內(nèi)部類寫法
匿名內(nèi)部類是在Java 8之前的標準寫法,語法上會顯得比較冗長。
// 使用匿名內(nèi)部類來定義 Callable 任務
FutureTask<String> futureTask_anonymous = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("通過匿名內(nèi)部類執(zhí)行...");
Thread.sleep(1000);
return "匿名內(nèi)部類的結(jié)果";
}
});
// 然后同樣交給 Thread 或線程池執(zhí)行
// Thread thread = new Thread(futureTask_anonymous);
// thread.start();
Lambda表達式寫法
Lambda表達式是Java 8及以后推薦的函數(shù)式編程寫法,非常簡潔。
由于Callable接口是一個函數(shù)式接口(只有一個抽象方法call()),所以可以被Lambda表達式替代。
// 使用Lambda表達式來定義 Callable 任務
FutureTask<String> futureTask_lambda = new FutureTask<>(() -> {
System.out.println("通過Lambda表達式執(zhí)行...");
Thread.sleep(1000);
return "Lambda的結(jié)果";
});
// 然后同樣交給 Thread 或線程池執(zhí)行
// Thread thread = new Thread(futureTask_lambda);
// thread.start();
Callable使用方式二:使用線程池執(zhí)行
execute方法只能接收Runnable對象,所以Callable對象不能使用線程池的execute方法執(zhí)行,只能使用submit方法執(zhí)行。
執(zhí)行會返回Future類的對象,對這個對象調(diào)用get()方法即可以拿到返回值。
示例代碼:
import java.util.concurrent.*;
public class ExecutorServiceDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 創(chuàng)建一個線程池 (圖中是單線程的,更常用的是固定大小的線程池)
ExecutorService executorService = Executors.newFixedThreadPool(2); // 例如,一個有兩個線程的池
System.out.println("主線程: 準備將任務提交給線程池。");
// 2. 將 Callable 任務提交給線程池 (直接使用Lambda表達式定義任務)
// submit() 方法會立即返回一個 Future 對象
Future<String> future = executorService.submit(() -> {
System.out.println("子線程 (" + Thread.currentThread().getName() + "): 開始執(zhí)行耗時任務...");
Thread.sleep(2000); // 模擬耗時2秒
return "任務完成,這是來自線程池的結(jié)果";
});
System.out.println("主線程: 任務已提交,我可以繼續(xù)做其他事情...");
// 在這里,主線程可以執(zhí)行其他不受阻塞的代碼
// 3. 在需要結(jié)果時,調(diào)用 future.get() 阻塞等待
System.out.println("主線程: 現(xiàn)在需要結(jié)果了,開始等待...");
String result = future.get(); // 阻塞,直到子線程任務完成并返回結(jié)果
System.out.println("主線程: 成功獲取到結(jié)果 -> \"" + result + "\"");
// 4. 關閉線程池 (這是必須的步驟,否則JVM不會退出)
executorService.shutdown();
}
}
Future接口
- 一個Future表示一個異步任務的結(jié)果;
- 它提供了一些方法用來檢查異步是不是已經(jīng)完成,沒有完成就等待,待其完成后取回異步執(zhí)行的結(jié)果;
- 異步執(zhí)行的結(jié)果,只能通過get()方法獲取,get()方法是阻塞的,如果異步執(zhí)行沒有執(zhí)行結(jié)束 ,則阻塞直至拿到結(jié)果;
- 它提供了一個cancel()方法用于取消異步執(zhí)行,執(zhí)行完畢后不可取消;
- 如果不想要返回的結(jié)果,也可以把底層的任務聲明為返回null。
Future接口有5個方法:

boolean isDone(): 判斷任務是否已經(jīng)完成。V get(): 阻塞等待,直到任務完成并返回結(jié)果。V get(long timeout, TimeUnit unit): 在指定時間內(nèi)阻塞等待,超時則拋出TimeoutException。boolean isCancelled(): 判斷任務是否在完成前被取消。boolean cancel(boolean mayInterruptIfRunning): 嘗試取消任務的執(zhí)行。
import java.util.concurrent.*;
public class FutureMethodsDemo {
public static void main(String[] args) {
// 創(chuàng)建一個固定大小為2的線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
System.out.println("--- 演示 get(), isDone() ---");
// 提交一個 Callable 任務,該任務會模擬2秒的計算,然后返回一個字符串結(jié)果
Future<String> future1 = executor.submit(() -> {
System.out.println("任務1: 開始執(zhí)行,預計耗時2秒...");
Thread.sleep(2000);
return "任務1執(zhí)行完畢!";
});
// 1. isDone()
// 在任務剛提交時,檢查它是否完成
System.out.println("提交任務1后,立即檢查 isDone(): " + future1.isDone());
try {
// 2. get()
// 這是一個阻塞方法,主線程會在這里等待,直到任務1執(zhí)行完成并返回結(jié)果
System.out.println("調(diào)用 future1.get(),主線程等待中...");
String result1 = future1.get();
System.out.println("future1.get() 獲得結(jié)果: \"" + result1 + "\"");
// 任務完成后,再次檢查isDone()
System.out.println("任務1完成后,再次檢查 isDone(): " + future1.isDone());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("\n--- 演示 get(timeout, unit) ---");
// 提交一個需要3秒才能完成的任務
Future<String> future2 = executor.submit(() -> {
System.out.println("任務2: 開始執(zhí)行,預計耗時3秒...");
Thread.sleep(3000);
return "任務2執(zhí)行完畢!";
});
try {
// 3. get(long timeout, TimeUnit unit)
// 我們只等待1秒,由于任務需要3秒,所以這必定會超時
System.out.println("調(diào)用 future2.get(1, TimeUnit.SECONDS),主線程最多等待1秒...");
String result2 = future2.get(1, TimeUnit.SECONDS);
System.out.println("在1秒內(nèi)獲取到結(jié)果: " + result2);
} catch (TimeoutException e) {
// 這里會捕獲到超時異常
System.out.println("等待超時! " + e.getClass().getName());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("\n--- 演示 cancel(), isCancelled() ---");
// 提交一個長時間運行的任務
Future<String> future3 = executor.submit(() -> {
System.out.println("任務3: 開始執(zhí)行,這是一個可能被中斷的長任務...");
try {
Thread.sleep(5000); // 模擬一個非常耗時的操作
} catch (InterruptedException e) {
// 如果任務被中斷,會進入這里
System.out.println("任務3: 執(zhí)行被中斷!");
return "已被中斷";
}
return "任務3正常完成";
});
// 為了確保 cancel() 在任務開始后執(zhí)行,我們稍微等待一下
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 4. cancel(boolean mayInterruptIfRunning)
// 嘗試取消任務的執(zhí)行。參數(shù)true表示如果任務已經(jīng)在運行,就中斷執(zhí)行它的線程。
System.out.println("嘗試取消任務3...");
boolean cancelResult = future3.cancel(true);
System.out.println("cancel(true) 方法返回: " + cancelResult);
// 5. isCancelled()
// 檢查任務是否已經(jīng)被成功取消
System.out.println("調(diào)用cancel后,檢查 isCancelled(): " + future3.isCancelled());
// 再次檢查isDone()。一個被取消的任務也被認為是“完成”的。
System.out.println("調(diào)用cancel后,檢查 isDone(): " + future3.isDone());
try {
// 試圖獲取一個已取消任務的結(jié)果,會拋出 CancellationException
System.out.println("嘗試 get() 已取消的任務3...");
String result3 = future3.get();
System.out.println("獲取到已取消任務的結(jié)果: " + result3);
} catch (CancellationException e) {
System.out.println("獲取結(jié)果失敗,因為任務已被取消! " + e.getClass().getName());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 關閉線程池,這是一個好習慣
executor.shutdownNow();
}
}
FutureTask類

基本特點
- FutureTask類實現(xiàn)了Runnable接口、Future接口(RunnableFuture接口)。
- FutureTask類除了實現(xiàn)接口的5+1個方法,還有兩個構造方法。
- FutureTask代表了一個可以取消的異步執(zhí)行。
- FutureTask可以用來包裝Runnable或者Callable。
- FutureTask可以提交到Executor中去執(zhí)行。
import java.util.concurrent.*;
public class FutureTaskFeaturesDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// =================================================================
// 特點 5: FutureTask 可以用來包裝 Runnable 或者 Callable
// =================================================================
// 創(chuàng)建一個 Callable 任務,它會模擬計算并返回一個結(jié)果
Callable<String> callableTask = () -> {
System.out.println("子線程 (來自Callable): 正在進行復雜的計算...");
Thread.sleep(2000); // 模擬耗時操作
return "計算完成,結(jié)果是ABC";
};
// 創(chuàng)建一個 Runnable 任務,它沒有返回值
Runnable runnableTask = () -> {
System.out.println("子線程 (來自Runnable): 正在執(zhí)行一個任務...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
// 使用 Callable 構造 FutureTask
FutureTask<String> futureTaskFromCallable = new FutureTask<>(callableTask);
// 使用 Runnable 構造 FutureTask,需要提供一個默認的返回結(jié)果
String defaultResult = "Runnable執(zhí)行完畢";
FutureTask<String> futureTaskFromRunnable = new FutureTask<>(runnableTask, defaultResult);
// =================================================================
// 特點 2 & 6: FutureTask 是一個實現(xiàn)了 Runnable 和 Future 的類,可以提交到 Executor 執(zhí)行
// =================================================================
// 因為 FutureTask 實現(xiàn)了 Runnable,所以它可以被線程池執(zhí)行
ExecutorService executor = Executors.newFixedThreadPool(2);
System.out.println("主線程: 將 futureTaskFromCallable 提交到線程池執(zhí)行。");
executor.submit(futureTaskFromCallable); // 特點6的演示
// =================================================================
// 特點 1 & 3: 代表可取消的異步計算,結(jié)果只能在執(zhí)行完成后才能獲取 (get() 會阻塞)
// =================================================================
System.out.println("主線程: 嘗試獲取 futureTaskFromCallable 的結(jié)果...");
// 在任務完成前,isDone() 返回 false (特點2的演示)
System.out.println("主線程: 任務完成了嗎? " + futureTaskFromCallable.isDone());
// 調(diào)用 get() 方法,主線程會在這里阻塞,直到子線程中的任務執(zhí)行完畢 (特點3的演示)
String result = futureTaskFromCallable.get(); // 阻塞點
System.out.println("主線程: 終于等到了結(jié)果 - " + result); // 特點1的演示
// 任務完成后,isDone() 返回 true
System.out.println("主線程: 任務現(xiàn)在完成了嗎? " + futureTaskFromCallable.isDone());
System.out.println("------------------------------------------");
// =================================================================
// 特點 4: 一旦執(zhí)行完成,則不能重新開始執(zhí)行,也不能取消
// =================================================================
System.out.println("主線程: 演示任務完成后的狀態(tài)。");
// 嘗試取消一個已經(jīng)完成的任務
boolean cancelResult = futureTaskFromCallable.cancel(true);
System.out.println("主線程: 嘗試取消已完成的任務,結(jié)果: " + cancelResult);
System.out.println("主線程: 已完成的任務被取消了嗎? " + futureTaskFromCallable.isCancelled());
// 再次提交已經(jīng)完成的任務到線程池
// 注意:這不會讓任務的 run() 方法被再次執(zhí)行。
// FutureTask 內(nèi)部有狀態(tài)控制,一旦任務完成(正常、異?;蛉∠臓顟B(tài)就不會再改變。
System.out.println("主線程: 再次提交同一個已完成的FutureTask...");
executor.submit(futureTaskFromCallable); // 提交是有效的,但任務的run方法不會再執(zhí)行
// 你可以再次 get(), 它會立刻返回之前已經(jīng)計算好的結(jié)果,而不會重新計算。
System.out.println("主線程: 再次get(),立即返回結(jié)果: " + futureTaskFromCallable.get());
System.out.println("------------------------------------------");
// 演示取消一個尚未完成的任務 (再次體現(xiàn)特點1)
FutureTask<String> longRunningTask = new FutureTask<>(() -> {
System.out.println("子線程 (長任務): 我要睡10秒,除非被中斷...");
Thread.sleep(10000);
return "我睡醒了";
});
executor.submit(longRunningTask);
Thread.sleep(100); // 確保任務已經(jīng)開始運行
System.out.println("主線程: 任務太慢了,決定取消它。");
longRunningTask.cancel(true); // 嘗試取消
System.out.println("主線程: 長任務被取消了嗎? " + longRunningTask.isCancelled());
System.out.println("主線程: 長任務算'完成'了嗎? " + longRunningTask.isDone());
try {
longRunningTask.get(); // 對已取消的任務調(diào)用get會拋出CancellationException
} catch (CancellationException e) {
System.out.println("主線程: 果然,獲取結(jié)果時拋出了 " + e.getClass().getSimpleName());
}
// 關閉線程池
executor.shutdown();
}
}
“不可重復執(zhí)行”特性(含源碼)
還有一點需要注意的就是,一旦執(zhí)行完成,就不能重新開始執(zhí)行,也不能取消。
FutureTask之所以再次執(zhí)行沒有效果,是因為它內(nèi)部設計了一個“一次性”的狀態(tài)機(State Machine)。一旦任務進入“完成”狀態(tài)(無論是正常結(jié)束、異常終止還是被取消),它的狀態(tài)就無法再回到“未開始”狀態(tài)。
示例如下:
錯誤的做法:
Callable<String> taskLogic = () -> "Hello"; FutureTask<String> myTask = new FutureTask<>(taskLogic); executor.submit(myTask); // 第一次執(zhí)行,有效 executor.submit(myTask); // 第二次提交同一個對象,無效
正確的做法:
Callable<String> taskLogic = () -> "Hello"; // 業(yè)務邏輯可以復用 // 第一次執(zhí)行 FutureTask<String> task1 = new FutureTask<>(taskLogic); executor.submit(task1); System.out.println(task1.get()); // 第二次執(zhí)行,必須創(chuàng)建新對象 FutureTask<String> task2 = new FutureTask<>(taskLogic); executor.submit(task2); System.out.println(task2.get());
為什么會有這種現(xiàn)象?下面對原理進行分析:
FutureTask被設計的首要目的不僅僅是“去執(zhí)行一個任務”,更是“持有(或代表)一個異步計算的結(jié)果”。
可以將其想象成一張一次性的彩票:
- 創(chuàng)建FutureTask: 買了一張彩票,彩票處于“未開獎”(
NEW)狀態(tài)。 - 執(zhí)行run()方法: 開獎過程開始了。這是彩票唯一一次被“使用”的機會。
- 執(zhí)行完成: 開獎結(jié)束,彩票狀態(tài)變?yōu)?ldquo;已開獎”(
NORMAL/EXCEPTIONAL),結(jié)果(中獎或未中獎)也已經(jīng)確定并記錄在彩票上。 - 調(diào)用get(): 隨時可以查看這張彩票的開獎結(jié)果。
- 再次執(zhí)行: 能用同一張已經(jīng)開過獎的彩票去參加下一輪的抽獎嗎?顯然不能。它的使命已經(jīng)在第一次開獎時完成了。
FutureTask也是如此。它的run()方法被設計為最多只執(zhí)行一次。一旦執(zhí)行完畢,它就從一個“任務執(zhí)行器”轉(zhuǎn)變?yōu)橐粋€“結(jié)果容器”。任何后續(xù)對get()的調(diào)用都會立刻返回已經(jīng)緩存的結(jié)果,而任何再次執(zhí)行它的嘗試都會被直接忽略。
FutureTask內(nèi)部通過一個state字段來管理其生命周期。這個狀態(tài)流轉(zhuǎn)是單向的,不可逆轉(zhuǎn)。
其主要狀態(tài)有:
NEW(0): 任務已創(chuàng)建,但尚未開始執(zhí)行。這是唯一可以開始執(zhí)行任務的狀態(tài)。COMPLETING(1): 任務正在執(zhí)行中,即將完成但結(jié)果還未設置。這是一個臨時的中間狀態(tài)。NORMAL(2): 任務已正常執(zhí)行完畢,結(jié)果已經(jīng)成功設置。EXCEPTIONAL(3): 任務執(zhí)行過程中拋出了異常,異常信息已被保存。CANCELLED(4): 任務在執(zhí)行完成前被取消。INTERRUPTING(5): 任務正在被中斷的過程中。INTERRUPTED(6): 任務已經(jīng)被成功中斷。
關鍵點:一旦狀態(tài)從 NEW 變?yōu)槿魏纹渌麪顟B(tài)(如NORMAL, EXCEPTIONAL, CANCELLED),就再也無法回到 NEW 狀態(tài)。
FutureTask的run()方法完美地體現(xiàn)了這種狀態(tài)檢查機制(以下為簡化后的邏輯):
public void run() {
// 1. 關鍵檢查:如果當前狀態(tài)不是 NEW,或者設置執(zhí)行線程失?。ㄕf明其他線程搶先執(zhí)行了),
// 則直接返回,不執(zhí)行任何操作。
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable; // 獲取包裝的任務
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 2. 真正執(zhí)行任務
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 3. 如果有異常,設置異常結(jié)果,并將狀態(tài)變?yōu)?EXCEPTIONAL
setException(ex);
}
if (ran)
// 4. 如果正常完成,設置正常結(jié)果,并將狀態(tài)變?yōu)?NORMAL
set(result);
}
} finally {
// 清理工作
runner = null;
// ...
}
}
從源碼可以看出,run()方法的第一行就是一個防御性檢查。如果你拿著一個已經(jīng)執(zhí)行過(state不再是NEW)的FutureTask實例去提交給線程池,線程池調(diào)用它的run()方法時,這個檢查會直接失敗,方法立即return,任務的邏輯自然就不會被再次執(zhí)行了。
set()方法是狀態(tài)變更的開始。
// FutureTask.java
protected void set(V v) {
// 使用 CAS(Compare-And-Swap)原子地將狀態(tài)從 NEW 變?yōu)?COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// outcome 字段用于存儲最終結(jié)果
this.outcome = v;
// 將最終狀態(tài)設置為 NORMAL (正常完成)
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// *** 關鍵調(diào)用!進入收尾階段 ***
finishCompletion();
}
}
- 這里有兩次狀態(tài)變更:首先原子地從
NEW變成一個臨時的COMPLETING狀態(tài),這是為了防止并發(fā)的cancel()操作。 - 然后設置
outcome字段保存結(jié)果。 - 最后,將狀態(tài)設置為最終的
NORMAL,并調(diào)用finishCompletion()。
finishCompletion() —— 喚醒所有等待者,這個方法負責喚醒所有因調(diào)用get()而被阻塞的線程。
// FutureTask.java
private void finishCompletion() {
// 'waiters' 是一個單向鏈表,存儲了所有正在等待結(jié)果的線程 (WaitNode)。
for (WaitNode q; (q = waiters) != null;) {
// 使用 CAS 將 'waiters' 鏈表頭置為 null,確保這個喚醒過程只被執(zhí)行一次。
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 遍歷整個等待者鏈表
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// *** 喚醒! ***
// 使用 LockSupport.unpark() 喚醒那個正在等待的線程。
// 那個線程會從 get() -> awaitDone() 的阻塞中醒來。
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // help GC
q = next;
}
break;
}
}
// 做一些完成后的清理工作
done();
callable = null; // a micro-optimization
}
- 核心邏輯: 該方法會原子地“摘下”整個等待者鏈表 (
waiters)。 - 然后,它會遍歷這個剛剛摘下的鏈表,并對其中的每一個節(jié)點所代表的線程,執(zhí)行
LockSupport.unpark(t)。 - 被
unpark的線程之前正阻塞在awaitDone()方法里,現(xiàn)在它被喚醒,就可以從awaitDone()返回,接著從get()方法拿到outcome中存儲的結(jié)果,然后繼續(xù)執(zhí)行。
整個流程串起來就是:
- A線程調(diào)用
futureTask.get()。get()發(fā)現(xiàn)任務未完成(state為NEW),于是將A線程包裝成一個WaitNode節(jié)點,加入到waiters等待鏈表中,然后LockSupport.park()讓自身(A線程)掛起等待。 - B線程(線程池中的工作線程)開始執(zhí)行
futureTask.run()。 run()方法檢查狀態(tài)為NEW,通過檢查,開始執(zhí)行業(yè)務代碼。- 業(yè)務代碼執(zhí)行完畢,
run()調(diào)用set(result)。 set(result)將state從NEW最終改為NORMAL,把結(jié)果存入outcome,然后調(diào)用finishCompletion()。finishCompletion()遍歷waiters鏈表,找到代表A線程的節(jié)點,并調(diào)用LockSupport.unpark(A線程)。- A線程被喚醒,從
get()方法中返回,并讀取outcome字段中已經(jīng)準備好的結(jié)果。
因為state的狀態(tài)被永久地改變?yōu)榱?code>NORMAL,并且finishCompletion保證了所有等待者都被喚醒,這個FutureTask的使命就此終結(jié)。任何后續(xù)對run()的調(diào)用都會在第一步的if (state != NEW)檢查中失敗,從而實現(xiàn)了“一次性”的語義。
從原始FutureTask對象中取值(含源碼)
將一個已經(jīng)創(chuàng)建好的FutureTask對象提交給ExecutorService時,submit方法會返回一個新的、包裝了你原始任務的Future對象。而計算的真正結(jié)果,仍然存儲在原始的那個FutureTask對象里。調(diào)用這個新的包裝對象的get()方法,往往得到的是null,而不是想要的結(jié)果。
代碼示例:
import java.util.concurrent.*;
public class FutureTaskGetDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 創(chuàng)建一個 Callable,這是我們真正的業(yè)務邏輯
Callable<String> myCallable = () -> {
System.out.println("子線程: 正在執(zhí)行核心業(yè)務邏輯...");
Thread.sleep(1000);
return "這是真正的計算結(jié)果";
};
// 2. 創(chuàng)建我們自己的、原始的 FutureTask 對象
// 我們需要用這個對象來獲取最終結(jié)果
FutureTask<String> originalFutureTask = new FutureTask<>(myCallable);
ExecutorService executor = Executors.newSingleThreadExecutor();
// 3. 將我們原始的 FutureTask 提交給線程池
// submit(Runnable) 會返回一個新的 Future 對象,我們稱之為 wrapperFuture
Future<?> wrapperFuture = executor.submit(originalFutureTask);
// 4. 分別從兩個 Future 對象中獲取結(jié)果
// 錯誤的做法:嘗試從 submit 方法返回的 Future 中獲取結(jié)果
// 這個 get() 等待的是 wrapperFuture 的完成,其結(jié)果是 null
Object resultFromWrapper = wrapperFuture.get();
System.out.println("從 submit() 返回的 wrapperFuture.get() 拿到的結(jié)果是: " + resultFromWrapper);
System.out.println("wrapperFuture is done: " + wrapperFuture.isDone());
System.out.println("-------------------------------------------------");
// 正確的做法:從我們自己創(chuàng)建的原始 FutureTask 對象中獲取結(jié)果
// 這個 get() 獲取的是存儲在 originalFutureTask 內(nèi)部的真實結(jié)果
String resultFromOriginal = originalFutureTask.get();
System.out.println("從原始的 originalFutureTask.get() 拿到的結(jié)果是: " + resultFromOriginal);
System.out.println("originalFutureTask is done: " + originalFutureTask.isDone());
executor.shutdown();
}
}
運行結(jié)果:
子線程: 正在執(zhí)行核心業(yè)務邏輯...
從 submit() 返回的 wrapperFuture.get() 拿到的結(jié)果是: null
wrapperFuture is done: true
-------------------------------------------------
從原始的 originalFutureTask.get() 拿到的結(jié)果是: 這是真正的計算結(jié)果
originalFutureTask is done: true
從源碼的角度看這種原因:
第1步: 進入ExecutorService.submit()
FutureTask本身實現(xiàn)了Runnable接口,所以這里實際調(diào)用的是submit(Runnable task)。我們看AbstractExecutorService中的實現(xiàn):
// AbstractExecutorService.java
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 關鍵!它調(diào)用了 newTaskFor 來創(chuàng)建一個新的 RunnableFuture 對象。
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 然后執(zhí)行這個新創(chuàng)建的任務
execute(ftask);
// 最后返回這個新創(chuàng)建的任務
return ftask;
}
初始的FutureTask被作為對象傳入,newTaskFor(originalFutureTask, null)被調(diào)用了。這個方法默認會返回一個新的FutureTask實例
所以,代碼等價于:RunnableFuture<Void> ftask = new FutureTask<Void>(originalFutureTask, null);
現(xiàn)在你就有了兩個FutureTask:
originalFutureTask: 你自己創(chuàng)建的,它包裝了myCallable,期望的結(jié)果是String類型。wrapperFuture(即ftask):submit方法內(nèi)部創(chuàng)建的,它包裝了originalFutureTask,期望的結(jié)果是null。
第2步: 任務執(zhí)行
線程池實際執(zhí)行的是外層的wrapperFuture。
wrapperFuture.run()被調(diào)用。wrapperFuture的Callable是什么?是originalFutureTask自己(因為它實現(xiàn)了Runnable)。所以,wrapperFuture的run方法內(nèi)部會調(diào)用originalFutureTask.run()。originalFutureTask.run()被調(diào)用。它會執(zhí)行最初的myCallable,計算出結(jié)果“真正的結(jié)果”。originalFutureTask執(zhí)行成功后,會調(diào)用set("真正的結(jié)果"),將結(jié)果保存在自己的outcome字段中。
第3步:finishCompletion()和關鍵的null操作
originalFutureTask在調(diào)用set()并最終進入finishCompletion()方法后,會執(zhí)行一項重要的清理工作。
// FutureTask.java
private void finishCompletion() {
// ... (喚醒等待者線程的代碼) ...
done(); // 這是一個空方法,留給子類擴展
// *** 最關鍵的一行 ***
// 為了幫助垃圾回收,將內(nèi)部的 callable 引用置為 null。
// 因為任務已經(jīng)執(zhí)行完了,理論上不再需要它了。
callable = null;
}
在originalFutureTask完成它的使命后,它扔掉了對myCallable的引用。它的outcome字段已經(jīng)安全地保存了結(jié)果“真正的結(jié)果”。
第4步: 結(jié)果返回
- 當
originalFutureTask.run()執(zhí)行完畢后,wrapperFuture的run()方法也隨之結(jié)束。 wrapperFuture也成功完成了,它會調(diào)用set(null)(因為創(chuàng)建它的時候newTaskFor(task, null)的第二個參數(shù)是null)。所以wrapperFuture的outcome字段保存的是null。
結(jié)論:
- 如果你調(diào)用
wrapperFuture.get(),你得到的是wrapperFuture的outcome,也就是null。 - 如果你調(diào)用
originalFutureTask.get(),你得到的是originalFutureTask的outcome,也就是 "真正的結(jié)果"。
新返回Future對象的作用
wrapperFuture的核心價值在于提供統(tǒng)一的控制和狀態(tài)管理接口,而不是為了傳遞結(jié)果。它是ExecutorService框架為了保持API一致性和健壯性而設計的關鍵一環(huán)。
任務取消是wrapperFuture最重要的用途之一。當你把任務提交給ExecutorService后,你手中唯一能直接操作的句柄就是submit方法返回的wrapperFuture。你需要通過它來嘗試取消任務的執(zhí)行。
當你在wrapperFuture上調(diào)用cancel()時,它會將這個取消請求傳播給內(nèi)部包裝的originalFutureTask。
FutureTask.cancel()方法會檢查任務狀態(tài),如果任務還沒開始跑,就將其狀態(tài)設置為CANCELLED。如果任務正在跑,它會根據(jù)你傳入的mayInterruptIfRunning參數(shù)來決定是否要中斷執(zhí)行該任務的線程。
import java.util.concurrent.*;
public class WrapperFutureCancelDemo {
public static void main(String[] args) throws InterruptedException {
FutureTask<String> originalFutureTask = new FutureTask<>(() -> {
System.out.println("子線程: 任務開始,準備睡5秒...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// cancel(true) 會讓線程在這里拋出中斷異常
System.out.println("子線程: 我被中斷了,任務提前結(jié)束!");
return "未完成的結(jié)果";
}
System.out.println("子線程: 任務正常完成。");
return "已完成的結(jié)果";
});
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交后,我們只能通過 wrapperFuture 來控制這個任務
Future<?> wrapperFuture = executor.submit(originalFutureTask);
// 讓主線程睡1秒,確保子任務已經(jīng)開始運行
Thread.sleep(1000);
System.out.println("主線程: 任務已經(jīng)跑了1秒,現(xiàn)在決定取消它。");
// 調(diào)用 wrapperFuture 的 cancel(true) 來中斷任務
boolean cancelled = wrapperFuture.cancel(true); // true表示如果任務正在運行,就中斷它
System.out.println("主線程: 任務取消成功了嗎? " + cancelled);
// isCancelled() 也會被傳播
System.out.println("主線程: wrapperFuture 的狀態(tài)是否是已取消? " + wrapperFuture.isCancelled());
System.out.println("主線程: originalFutureTask 的狀態(tài)是否是已取消? " + originalFutureTask.isCancelled());
executor.shutdown();
}
}
運行結(jié)果:
子線程: 任務開始,準備睡5秒...
主線程: 任務已經(jīng)跑了1秒,現(xiàn)在決定取消它。
子線程: 我被中斷了,任務提前結(jié)束!
主線程: 任務取消成功了嗎? true
主線程: wrapperFuture 的狀態(tài)是否是已取消? true
主線程: originalFutureTask 的狀態(tài)是否是已取消? true
在這個例子中,完全沒有用到get(),但wrapperFuture.cancel()成功地管理了任務的生命周期。
雖然wrapperFuture.get()返回的是null,但它依然是一個阻塞方法。它的作用是讓當前線程等待,直到任務執(zhí)行完成。這是一個非常重要的同步機制。
更重要的是,如果你的原始任務在執(zhí)行時拋出了異常,調(diào)用wrapperFuture.get()會重新拋出這個異常(包裝在ExecutionException中)。這使得主線程能夠捕獲并處理后臺任務的錯誤。
import java.util.concurrent.*;
public class WrapperFutureExceptionDemo {
public static void main(String[] args) {
FutureTask<String> originalFutureTask = new FutureTask<>(() -> {
System.out.println("子線程: 任務開始,即將拋出異常!");
throw new RuntimeException("計算出錯!");
});
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> wrapperFuture = executor.submit(originalFutureTask);
try {
System.out.println("主線程: 調(diào)用 wrapperFuture.get() 等待任務完成...");
// 這里會阻塞,直到任務結(jié)束。因為任務拋了異常,get()也會拋出異常。
wrapperFuture.get();
} catch (InterruptedException e) {
System.err.println("主線程: 等待時被中斷了。");
} catch (ExecutionException e) {
System.err.println("主線程: 成功捕獲到后臺任務的異常!");
System.err.println(" - 根本原因: " + e.getCause());
}
executor.shutdown();
}
}
運行結(jié)果:
子線程: 任務開始,即將拋出異常!
主線程: 調(diào)用 wrapperFuture.get() 等待任務完成...
主線程: 成功捕獲到后臺任務的異常!
- 根本原因: java.lang.RuntimeException: 計算出錯!
即便不關心返回值,wrapperFuture.get()在錯誤處理和流程同步上也是不可或缺的。
ExecutorService的設計哲學是提供一套統(tǒng)一、可預測的接口。
submit(Callable<T> task)返回Future<T>submit(Runnable task, T result)返回Future<T>submit(Runnable task)返回Future<?>
無論你提交什么類型的任務,submit方法總是返回一個Future對象。這讓使用者可以依賴一個統(tǒng)一的模型來管理所有異步任務,而不必寫if-else來判斷提交的是Callable還是Runnable。wrapperFuture (Future<?>)正是這個統(tǒng)一模型中,用于代表“無返回值任務”的那個標準占位符。
總結(jié)來說,新返回的Future對象能夠安全、統(tǒng)一地管理后臺任務,即便并不關心那個任務的返回值。
總結(jié)
到此這篇關于Java開發(fā)異步編程中常用的接口和類的文章就介紹到這了,更多相關Java異步編程接口和類內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot實現(xiàn)application.yml文件敏感信息加密
本文主要介紹了SpringBoot實現(xiàn)application.yml文件敏感信息加密,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-07-07
Spring boot框架下的RabbitMQ消息中間件詳解
這篇文章詳細介紹了Spring Boot框架下的RabbitMQ消息中間件的基本概念、消息傳輸模型、環(huán)境準備、Spring Boot集成以及消息生產(chǎn)和消費,感興趣的朋友跟隨小編一起看看吧2025-01-01
mybatis?plus?MetaObjectHandler?不生效的解決
今天使用mybatis-plus自動為更新和插入操作插入更新時間和插入時間,配置了MetaObjectHandler不生效,本文就來解決一下,具有一定的 參考價值,感興趣的可以了解一下2023-10-10
Springboot使用Maven占位符@替換不生效問題及解決
這篇文章主要介紹了Springboot使用Maven占位符@替換不生效問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-04-04
SpringBoot@DeleteMapping(/xxx/{id})請求報405的解決
這篇文章主要介紹了SpringBoot@DeleteMapping(/xxx/{id})請求報405的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01
java Socket無法完全接收返回內(nèi)容的解決方案
這篇文章主要介紹了java Socket無法完全接收返回內(nèi)容的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10

