Java中的FutureTask源碼解析
一、簡介
1、FutureTask是一個可取消的異步計算。這個類是Future的實現(xiàn)類,有開始和取消一個計算的方法,如果一個計算已經完成可以查看結果。如果在計算沒有完成的情況下調用get獲取計算結果會阻塞。且一旦任務完成后,計算不能重新開始或被取消,除非計算被runAndReset調用執(zhí)行。
2、FutureTask被用來去封裝一個Callable或者Runnable,一個FutureTask能夠被submit作為一個Executor
3、FutureTask 的線程安全由CAS來保證。
二、源碼分析
1、成員屬性
public class FutureTask<V> implements RunnableFuture<V> {
//state表示的任務的狀態(tài)
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
//任務
private Callable<V> callable;
//存儲任務完成以后的結果
private Object outcome;
//執(zhí)行當前任務的線程
private volatile Thread runner;
//執(zhí)行當前任務被阻塞的線程
private volatile WaitNode waiters;
}可能有的狀態(tài)轉換:
NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> INTERRUPTING -> INTERRUPTED
注意:state用volatile修飾的,如果在多線程并發(fā)的情況下,某一個線程改變了任務的狀態(tài),其他線程都能夠立馬知道,保證了state字段的可見性。
2、構造函數(shù)
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}很好的詮釋了FutureTask封裝了Runnable或Callable,構造完成后將任務的狀態(tài)變?yōu)镹EW。同時注意,封裝Runnable時用的Executors的靜態(tài)方法callable
順帶看下Executors.callable()這個方法,這個方法的功能是把Runnable轉換成Callable,代碼如下:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}所以,F(xiàn)utureTask封裝Runnable使用了適配器模式的設計模式
3、核心方法
//運行任務的方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable; //得到當前任務
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); //當前任務調用call方法,執(zhí)行,同時,執(zhí)行完后將結果返回
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran) //表示任務執(zhí)行成功
set(result); //CAS改變任務的狀態(tài)從NEW->COMPLETING->NORMAL,同時將任務返回的結果保存到outcome屬性中,再移除并喚醒所有等待線程
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; //將任務成功執(zhí)行完后返回的結果保存到outcome中
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終的狀態(tài),表示任務結束
finishCompletion(); //移除并喚醒所有等待線程
}
}
//該方法用于移除并喚醒所有等待線程
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); //喚醒
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null;
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt(); //打斷
} finally { // 設置成為最終態(tài)INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion(); //移除并喚醒所有等待線程
}
return true;
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L); //如果任務沒有完成或者其他的問題,將阻塞;創(chuàng)建一個新節(jié)點存入阻塞棧中
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}三、示例
常用使用方式:
- 第一種方式: Future + ExecutorService
- 第二種方式: FutureTask + ExecutorService
- 第三種方式: FutureTask + Thread
第一種方式:Future + ExecutorService
public class FutureDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Future future = executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
Long start = System.currentTimeMillis();
while (true) {
Long current = System.currentTimeMillis();
if ((current - start) > 1000) {
return 1;
}
}
}
});
try {
Integer result = (Integer)future.get();
System.out.println(result);
}catch (Exception e){
e.printStackTrace();
}
}
}第二種方式:FutureTask + ExecutorService
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
executor.submit(futureTask);第三種方式:FutureTask + Thread
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task());
Thread thread = new Thread(futureTask);
thread.setName("Task thread");
thread.start();四、總結
1、FutureTask用來封裝Runnable或者Callable接口,可以當成一個任務。
2、在Java并發(fā)程序中FutureTask表示一個可以取消的異步運算。它有啟動和取消運算、查詢運算是否完成和取回運算結果等方法。只有當運算完成的時候結果才能取回,如果運算尚未完成get方法將會阻塞。一個FutureTask對象可以對調用了Callable和Runnable的對象進行包裝,由于FutureTask也是調用了Runnable接口所以它可以提交給Executor來執(zhí)行。
3、FutureTask可用于異步獲取執(zhí)行結果或取消執(zhí)行任務的場景,通過傳入Runnable或者Callable的任務給FutureTask,直接調用其run方法或者放入線程池執(zhí)行,之后可以在外部通過FutureTask的get方法異步獲取執(zhí)行結果,因此,F(xiàn)utureTask非常適合用于耗時的計算,主線程可以在完成自己的任務后,再去獲取結果。另外,F(xiàn)utureTask還可以確保即使調用了多次run方法,它都只會執(zhí)行一次Runnable或者Callable任務,或者通過cancel取消FutureTask的執(zhí)行等。
4、FutureTask間接繼承了Runnable和Callable
5、FutureTask的線程安全由CAS操作來保證
6、FutureTask結果返回機制 :只有任務成功執(zhí)行完成后,通過get方法能夠得到任務返回的結果,其他情況都會導致阻塞。
到此這篇關于Java中的FutureTask源碼解析的文章就介紹到這了,更多相關FutureTask源碼解析內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
在CentOS上安裝Java 17并實現(xiàn)多版本共存的詳細教程
在現(xiàn)代軟件開發(fā)中,Java 作為一種廣泛使用的編程語言,其版本更新頻繁,不同項目可能依賴不同版本的 Java 運行環(huán)境,CentOS 作為一款流行的 Linux 發(fā)行版,常被用于服務器部署和開發(fā)環(huán)境,本文將詳細介紹如何在 CentOS 上安裝 Java 17,并實現(xiàn)與現(xiàn)有 Java 8 的多版本共存2025-03-03
SpringKafka消息發(fā)布之KafkaTemplate與事務支持功能
通過本文介紹的基本用法、序列化選項、事務支持、錯誤處理和性能優(yōu)化技術,開發(fā)者可以構建高效可靠的Kafka消息發(fā)布系統(tǒng),事務支持特性尤為重要,它確保了在分布式環(huán)境中的數(shù)據(jù)一致性,感興趣的朋友一起看看吧2025-04-04

