C#并行編程之Task任務(wù)
任務(wù),基于線程池。其使我們對(duì)并行編程變得更簡單,且不用關(guān)心底層是怎么實(shí)現(xiàn)的。
System.Threading.Tasks.Task類是Task Programming Library(TPL)中最核心的一個(gè)類。
一、任務(wù)與線程
1:任務(wù)是架構(gòu)在線程之上的,也就是說任務(wù)最終還是要拋給線程去執(zhí)行。
2:任務(wù)跟線程不是一對(duì)一的關(guān)系,比如開10個(gè)任務(wù)并不是說會(huì)開10個(gè)線程,這一點(diǎn)任務(wù)有點(diǎn)類似線程池,但是任務(wù)相比線程池有很小的開銷和精確的控制。
我們用VS里面的“并行任務(wù)”看一看,快捷鍵Ctrl+D,K,或者找到“調(diào)試"->"窗口“->"并行任務(wù)“,我們在WaitAll方法處插入一個(gè)斷點(diǎn),最終我們發(fā)現(xiàn)任務(wù)確實(shí)托管給了線程。

二、初識(shí)Task
兩種構(gòu)建Task的方式,只是StartNew方法直接構(gòu)建出了一個(gè)Task之后又調(diào)用了其Start方法。
Task.Factory.StartNew (() =>
{
Console.WriteLine("Hello word!");
});
Task task =
new Task
(() =>
{
Console.WriteLine("Hello,Word!");
});
task.Start();在Task內(nèi)部執(zhí)行的內(nèi)容我們稱作為Task的Body,Task提供了多個(gè)初始化重載的方法。
public Task(Action action); public Task(Action<object> action, object state );給action傳參數(shù) public Task(Action action, CancellationToken cancellationToken); public Task(Action action, TaskCreationOptions creationOptions);
例如使用了重載方法的State參數(shù):
Task task2 = new Task((obj ) =>
{
Console.WriteLine("Message: {0}", obj); }, "Say \"Hello\" from task2");
task2.Start();
}補(bǔ)充細(xì)節(jié)
在創(chuàng)建Task的時(shí)候,Task有很多的構(gòu)造函數(shù)的重載,一個(gè)主要的重載就是傳入TaskCreateOptions的枚舉:
- TaskCreateOptions.None:用默認(rèn)的方式創(chuàng)建一個(gè)Task
- TaskCreateOptions.PreferFairness:請(qǐng)求scheduler盡量公平的執(zhí)行Task(Task和線程一樣,有優(yōu)先級(jí)的)
- TaskCreateOptions.LongRunning:聲明Task將會(huì)長時(shí)間的運(yùn)行。
- TaskCreateOptions.AttachToParent:因?yàn)門ask是可以嵌套的,所以這個(gè)枚舉就是把一個(gè)子task附加到一個(gè)父task中。
三、任務(wù)的結(jié)果
任務(wù)結(jié)束時(shí),它可以把一些有用的狀態(tài)信總寫到共享對(duì)象中。這個(gè)共享對(duì)象必須是線程安全的。
另一個(gè)方式是使用返回某個(gè)結(jié)果的任務(wù)。使用Task類的泛型版本,就可以定義返冋某個(gè)結(jié)果的任務(wù)的返回類型。
使用返回值的Result屬性可獲取是在一個(gè)Task運(yùn)行完成才會(huì)獲取的,所以task2是在task1運(yùn)行完成后,才開始運(yùn)行,也就是說上面的兩個(gè)result的值不管運(yùn)行多少次都是不會(huì)變的。其中我們也可以通過CurrentId來獲取當(dāng)前運(yùn)行的Task的編號(hào)。
var loop = 0;
var task1 = new Task<int>(() =>
{
for (var i = 0; i < 1000; i++)
loop += i;
return loop;
});
task1.Start();
var loopResut = task1.Result;
var task2 = new Task<long>(obj=>
{
long res = 0;
var looptimes = (int)obj;
for (var i = 0; i < looptimes; i++)
res += i;
return res;
},loopResut);
task2.Start();
var resultTask2 = task2.Result;
Console.WriteLine("任務(wù)1的結(jié)果':{0}\n任務(wù)2的結(jié)果:{1}", loopResut,resultTask2);.NET 4.5 :Task.Run
在 .NET Framework 4.5 及更高版本(包括 .NET Core 和 .NET Standard)中,使用靜態(tài) Task.Run 方法作為 TaskFactory.StartNew 的快捷方式。
Task.Run的跟Task.Factory.StarNew和new Task相差不多,不同的是前兩種是放進(jìn)線程池立即執(zhí)行,而Task.Run則是等線程池空閑后在執(zhí)行。

Run方法只接受無參的Action和Func委托,另外兩個(gè)接受一個(gè)object類型的參數(shù)。

在msdn中TaskFactory.StartNew的備注信息如下:

四、連續(xù)任務(wù)
所謂的延續(xù)的Task就是在第一個(gè)Task完成后自動(dòng)啟動(dòng)下一個(gè)Task。我們通過ContinueWith方法來創(chuàng)建延續(xù)的Task。我們假設(shè)有一個(gè)接受xml解析的服務(wù),首先從某個(gè)地方接受文件,然后解析入庫,最后發(fā)送是否解析正確的回執(zhí)。在每次調(diào)用ContinueWith方法時(shí),每次會(huì)把上次Task的引用傳入進(jìn)來,以便檢測上次Task的狀態(tài),比如我們可以使用上次Task的Result屬性來獲取返回值。
var ReceiveTask = new Task(() => ReceiveXml());
var ResolveTask = ReceiveTask .ContinueWith <bool>((r) => ResolveXml());
var SendFeedBackTask = ResolveTask.ContinueWith <string>((s) => SendFeedBack(s.Result));
ReceiveTask.Start();
Console.WriteLine(SendFeedBackTask.Result);上面的代碼我們也可以這么寫:
var SendFeedBackTask = Task.Factory.StartNew(() => ReceiveXml())
.ContinueWith<bool>(s => ResolveXml())
.ContinueWith<string>(r => SendFeedBack(r.Result));
Console.WriteLine(SendFeedBackTask.Result);無論前一個(gè)任務(wù)是如何結(jié)束的,前面的連續(xù)任務(wù)總是在前一個(gè)任務(wù)結(jié)束時(shí)啟動(dòng)。使用 TaskContinuationOptions枚舉中的值,可以指定,連續(xù)任務(wù)只有在起始任務(wù)成功(或失敗)結(jié)束吋啟動(dòng)??赡艿闹凳?OnlyOnFaulted、NotOoFaulted、Onl)OnCanceIed、NotOnCanceled 和 OnlyOnRanToCompletion
Task t5 = t1.ContinueWith(DoOnError, TaskContinuationOptions.OnlyOnFaulted);
五、分離嵌套任務(wù)
有些情況下我們需要?jiǎng)?chuàng)建嵌套的Task,嵌套里面又分為分離的和不分離的。其創(chuàng)建的方式很簡單,就是在Task的body里面創(chuàng)建一個(gè)新的Task。如果新的Task未指定AttachedToParent選項(xiàng),那么就是分離嵌套的。我們看下面這段代碼。下面的代碼中outTask.Wait()表示等待outTask執(zhí)行完成。
var outTask = Task.Factory.StartNew(() =>
{
Console.WriteLine("Outer task beginning...");
var childTask = Task.Factory.StartNew(() =>
{
Thread.SpinWait(3000000);
Console.WriteLine("Detached nested task completed.");
});
});
outTask.Wait();
Console.WriteLine("Outer task completed.");
Console.ReadKey();我們可以看到運(yùn)行結(jié)果是:

六、子任務(wù)
我們將上面的代碼加上TaskCreationOptions選項(xiàng):
如果父任務(wù)在子任務(wù)之前結(jié)束,父任務(wù)的狀態(tài)就顯示為WaitingForChildrenToComplete。只要子任務(wù)也結(jié)束時(shí),父任務(wù)的狀態(tài)就變成RanToCompletion。.、當(dāng)然,如果父任務(wù)用TaskCreatiooOptions 枚舉中的DetachedFromParent創(chuàng)建子任務(wù)時(shí),這就無效。
var outTask = Task.Factory.StartNew(() =>
{
Console.WriteLine("Outer task beginning...");
var childTask = Task.Factory.StartNew(() =>
{
Thread.SpinWait(3000000);
Console.WriteLine("Detached nested task completed.");
},TaskCreationOptions.AttachedToParent);
});
outTask.Wait();
Console.WriteLine("Outer task completed.");看到運(yùn)行結(jié)果:

七、取消任務(wù)
在4.0中給我們提供一個(gè)“取消標(biāo)記”叫做CancellationTokenSource.Token,在創(chuàng)建task的時(shí)候傳入此參數(shù),就可以將主線程和任務(wù)相關(guān)聯(lián)。我們通過cancellation的tokens來取消一個(gè)Task。
有點(diǎn)要特別注意的,當(dāng)我們調(diào)用了Cancel()方法之后,.NET Framework不會(huì)強(qiáng)制性的去關(guān)閉運(yùn)行的Task。我們自己必須去檢測之前在創(chuàng)建Task時(shí)候傳入的那個(gè)CancellationToken。
一旦cancel被調(diào)用,task將會(huì)拋出OperationCanceledException來中斷此任務(wù)的執(zhí)行,最后將當(dāng)前task的Status的IsCanceled屬性設(shè)為true。
1、在很多Task的Body里面包含循環(huán),我們可以在輪詢的時(shí)候判斷IsCancellationRequested屬性是否為True,如果是True的話,就可以停止循環(huán)以及釋放資源,同時(shí)拋出OperationCanceledException異常出來。
2、或者在任務(wù)中設(shè)置“取消信號(hào)“叫做ThrowIfCancellationRequested,來等待主線程使用Cancel來通知。
3、檢測task是否被cancel就是調(diào)用CancellationToken.WaitHandle屬性。CancellationToken的WaitOne()方法會(huì)阻止task的運(yùn)行,只有CancellationToken的cancel()方法被調(diào)用后,這種阻止才會(huì)釋放。
var cts = new CancellationTokenSource();
var ct = cts.Token;
var task = Task.Factory.StartNew(() =>
{
for (var i = 0; i < 10000000; i++)
{
if (ct.IsCancellationRequested)
{
Console.WriteLine("任務(wù)開始取消...");
throw new OperationCanceledException(ct);
}
//或者直接在檢測到異常時(shí),扔出異常: token.ThrowIfCancellationRequested();
//或者等待 WaitHandle: token.WaitHandle.WaitOne();
}
},ct);//傳入CancellationToken作為Task第二個(gè)參數(shù)
ct.Register(() =>
{
Console.WriteLine("已經(jīng)取消");
});
Thread.Sleep(5000);
cts.Cancel();//如果想要取消一個(gè)Task的運(yùn)行,只要調(diào)用CancellationToken實(shí)例的Cancel()方法就可以了。
try
{
task.Wait();
}
catch (AggregateException e)
{
foreach (var v in e.InnerExceptions)
Console.WriteLine("msg: " + v.Message);
}八、休眠:等待時(shí)間執(zhí)行
在TPL中我們可以通過三種方式進(jìn)行等待,一是通過CancellTaken的WaitHanle進(jìn)行等待、第二種則是通過傳統(tǒng)的Tread.Sleep方法、第三種則通過Thread.SpainWait方法。
1、CancellToken方式:每次我們等待十秒鐘之后,再進(jìn)行下次輸出。
有一點(diǎn)要注意:WaitOne()方法只有在設(shè)定的時(shí)間間隔到了,或者Cancel方法被調(diào)用,此時(shí)task才會(huì)被喚醒。如果如果cancel()方法被調(diào)用而導(dǎo)致task被喚醒,那么CancellationToken.WaitHandle.WaitOne()方法就會(huì)返回true,如果是因?yàn)樵O(shè)定的時(shí)間到了而導(dǎo)致task喚醒,那么CancellationToken.WaitHandle.WaitOne()方法返回false。
var cts = new CancellationTokenSource();
var ct = cts.Token;
var task = new Task(() =>
{
for (var i = 0; i < 100000; i++)
{
var cancelled = ct.WaitHandle.WaitOne(1000 );
Console.WriteLine(" {0}. Cancelled? {1}", i, cancelled);
if (cancelled)
{
throw new OperationCanceledException(ct);
}
}
}, ct);
task.Start();
2、上面的功能如果我們要是通過Tread.Sleep方式實(shí)現(xiàn):
var task = new Task(() =>
{
for (var i = 0; i < 100000; i++)
{
Thread.Sleep(10000);
var cancelled =ct.IsCancellationRequested;
Console.WriteLine(" {0}. Cancelled? {1}",
i, cancelled); if (cancelled)
{
throw new OperationCanceledException(ct);
}
}
},ct);3、Thread.SpainWait則跟上面兩種方式完全不同,上面的兩種方式都是會(huì)在線程調(diào)度程序不考慮改線程,直等到運(yùn)行結(jié)束。而Thread.SpainWait的作用實(shí)質(zhì)上會(huì)將處理器置于十分緊密的循環(huán)中,主要的作用是來實(shí)現(xiàn)同步鎖的作用。并不常用,大部分情況下我們可以通過Lock的方式來實(shí)現(xiàn)。
Thread.SpinWait(10000);
九、等待任務(wù)執(zhí)行
在很多時(shí)候我們也許需要等待同時(shí)開啟的幾個(gè)線程完成之后再來做其他事,在TPL中提供了幾種方式來等待任務(wù)執(zhí)行。Task.Wait等待單個(gè)任務(wù)完成;Task.WaitAll等待所有的Task完成、TaskAny等在其中的任何一個(gè)或則多個(gè)任務(wù)完成。
1、Task.Wait: 等待單獨(dú)的一個(gè)Task執(zhí)行完成
共有5個(gè)重載:Wait()、Wait(CancellToken)、Wait(Int32)、Wait(TimeSpan)、Wait(TimeSpan、CancellToken)。各個(gè)重載方法的含義:
- 1)Wait():等待整個(gè)任務(wù)完成或者取消或者出現(xiàn)異常;
- 2)Wait(CancellToken):等待任務(wù)直到CancellToken調(diào)用取消或者完成,或者出現(xiàn)異常;
- 3)Wait(Int32):等待任務(wù),未完成則到指定的時(shí)間;
- 4)Wait(TimeSpan):同上;
- 5)Wait(TimeSpan、CancellToken):等待任務(wù)到指定時(shí)間,或者CancellToken調(diào)用取消或者任務(wù)完成。
static void Main(string[] args)
{
var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
Task task = createTask(token,6); task.Start();
Console.WriteLine("Wait() complete.");
task.Wait();
Console.WriteLine("Task Completed.");
task = createTask(token,3);
task.Start();
Console.WriteLine("Wait(2) secs for task to complete.");
bool completed = task.Wait(2000);
Console.WriteLine("Wait ended - task completed: {0}", completed);
task = createTask(token,4);
task.Start();
Console.WriteLine("Wait(2,token) for task to complete.");
completed = task.Wait(2000, token);
Console.WriteLine("Wait ended - task completed: {0} task cancelled {1}",
completed, task.IsCanceled);
Console.WriteLine("Main method complete. Press enter to finish.");
Console.ReadLine();
}
static Task createTask(CancellationToken token,int loop)
{
return new Task(() =>
{
for (int i = 0; i < loop; i++)
{
token.ThrowIfCancellationRequested();
Console.WriteLine("Task - Int value {0}", i);
token.WaitHandle.WaitOne(1000);
}
}, token);
}循環(huán)都會(huì)等待1秒鐘,這樣我們可以看看Wait(2000)的效果,看看運(yùn)行后的效果:

從上面的例子可以看出,wait方法子task執(zhí)行完成之后會(huì)返回true。
注意:當(dāng)在執(zhí)行的task內(nèi)部拋出了異常之后,這個(gè)異常在調(diào)用wait方法時(shí)會(huì)被再次拋出。后面再"異常處理篇"會(huì)講述。
2、Task.WaitAll方法: 等待多個(gè)task
是等待所有的任務(wù)完成,也有5個(gè)重載, 也可以傳遞時(shí)間以及Token參數(shù),進(jìn)行等待時(shí)間以及取消Token的控制。
var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
var task1 = createTask(token,2);
var task2 = createTask(token, 5);
task1.Start();
task2.Start();
Console.WriteLine("Waiting for tasks to complete.");
Task.WaitAll(task1, task2);
Console.WriteLine("Tasks Completed.");
注意:如果在等在的多個(gè)task之中,有一個(gè)task拋出了異常,那么調(diào)用WaitAll()方法時(shí)就會(huì)拋出異常。
ContinueWith結(jié)合WaitAll來玩一把
當(dāng)這兩者結(jié)合起來,我們就可以玩一些復(fù)雜一點(diǎn)的東西,比如說現(xiàn)在有4個(gè)任務(wù),其中t1需要串行,t2-t3可以并行,t4需要串行.

ConcurrentStack<int> stack = new ConcurrentStack<int>();
//t1先執(zhí)行
var t1 = Task.Factory.StartNew(() =>
{
stack.Push(1);
stack.Push(2);
});
//t2,t3并行執(zhí)行
var t2 = t1.
ContinueWith
(t =>
{
int result;
stack.TryPop(out result);
});
//t2,t3并行執(zhí)行
var t3 = t1.
ContinueWith
(t =>
{
int result;
stack.TryPop(out result);
});
//等待t2和t3執(zhí)行完
Task.WaitAll(t2, t3);
//t4z再執(zhí)行
var t4 = Task.Factory.StartNew(() =>
{
Console.WriteLine("當(dāng)前集合元素個(gè)數(shù):" + stack.Count);
});3、Task.WaitAny
等待任何一個(gè)任務(wù)完成,完成之后返回其完成的任務(wù)的Index:
var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
var task1 = createTask(token,2);
var task2 = createTask(token, 5);
task1.Start();
task2.Start();
Console.WriteLine("Waiting for tasks to complete.");
var index = Task.WaitAny(task1, task2);
Console.WriteLine("Tasks Completed.Index is {0}",index);
十、異常處理
在TPL中,異常的觸發(fā)器主要是這幾個(gè):
Task.Wait(), Task.WaitAll(), Task,WaitAny(),Task.Result。而在TPL出現(xiàn)的異常都會(huì)以AggregateException的示例拋出,我們在進(jìn)行基本的異常處理時(shí),可以通過查看AggregateException的InnerExceptions來進(jìn)行內(nèi)部異常的捕獲:
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var task1 = new Task(() =>
{
throw new NullReferenceException() {
Source
="task1"};
});
var task2 = new Task(() =>
{
throw new ArgumentNullException("a", "a para can not be null") { Source="task2"};
});
task1.Start(); task2.Start();
try
{
Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
foreach (Exception inner in ex.InnerExceptions)
{
Console.WriteLine("Exception type {0} from {1}",
inner.GetType(), inner.Source);
}
}同時(shí),我們還可以通過Task的幾個(gè)屬性來判斷Task的狀態(tài),如:IsCompleted, IsFaulted, IsCancelled,Exception。
另外,AggregateException中還提供了Handle方法來給我們方法來給我們處理每個(gè)內(nèi)部 異常,每個(gè)異常發(fā)生時(shí)都會(huì)調(diào)用Handle傳入的delegate ,同時(shí)我們需要通過返回True,False來告訴異常是否已經(jīng)被處理,比如對(duì)于OperationCanceledException我們知道是取消了Task,是肯定可以處理的:
try
{
Task.WaitAll(task1, task2, task3, task4);
}
catch(AggregateException ex)
{
ex.Handle((e) =>
{
if (e is OperationCanceledException)
{
return true;
}
else
{
return false;
}
});
}十一、執(zhí)行晚加載的Task(Lazily Task)
晚加載,或者又名延遲初始化,主要的好處就是避免不必要的系統(tǒng)開銷。在并行編程中,可以聯(lián)合使用Lazy變量和Task<>.Factory.StartNew()做到這點(diǎn)。(Lazy變量時(shí).NET 4中的一個(gè)新特性,這里大家不用知道Lazy的具體細(xì)節(jié))。
Lazy變量只有在用到的時(shí)候才會(huì)被初始化。所以我們可以把Lazy變量和task的創(chuàng)建結(jié)合:只有這個(gè)task要被執(zhí)行的時(shí)候才去初始化。
// do the same thing in a single statement
Lazy<Task<string>> lazyData2 = new Lazy<Task<string>>(
() => Task<string>.Factory.StartNew(() =>
{
Console.WriteLine("Task body working...");
return "Task Result";
}));
Console.WriteLine("Calling second lazy variable");
Console.WriteLine("Result from task: {0}", lazyData2.Value.Result);首先我們回想一下,在之前的系列文章中我們是怎么定義一個(gè)task的:直接new,或者通過task的factory來創(chuàng)建,因?yàn)閯?chuàng)建task的代碼是在main函數(shù)中的,所以只要new了一個(gè)task,那么這個(gè)task就被初始化。現(xiàn)在如果用了Lazy的task,那么現(xiàn)在我們初始化的就是那個(gè)Lazy變量了,而沒有初始化task,(初始化Lazy變量的開銷小于初始化task),只有當(dāng)調(diào)用了lazyData.Value時(shí),Lazy變量中包含的那個(gè)task才會(huì)初始化。
到此這篇關(guān)于C#并行編程之Task任務(wù)的文章就介紹到這了。希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Unity實(shí)現(xiàn)輪盤方式的按鈕滾動(dòng)效果
這篇文章主要為大家詳細(xì)介紹了Unity實(shí)現(xiàn)輪盤方式的按鈕滾動(dòng)效果,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-02-02
C#開發(fā)Windows UWP系列之布局面板RelativePanel
這篇文章介紹了C#開發(fā)Windows UWP系列之布局面板RelativePanel,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06
C# winfrom異步加載數(shù)據(jù)不影響窗體UI的操作方法
這篇文章主要介紹了C# winfrom 異步加載數(shù)據(jù)不影響窗體UI,在 WinForms 應(yīng)用程序中,如果數(shù)據(jù)加載是一個(gè)比較耗時(shí)的操作,直接在主線程中進(jìn)行加載會(huì)導(dǎo)致 UI 凍結(jié),這是因?yàn)?nbsp;WinForms 的 UI 是單線程的,主線程被阻塞時(shí)就無法處理其他 UI 相關(guān)的任務(wù)2024-12-12
C#調(diào)用usb攝像頭的實(shí)現(xiàn)方法
這篇文章主要介紹了C#調(diào)用usb攝像頭的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02
C# OCR實(shí)現(xiàn)文字識(shí)別功能
OCR,中文叫做光學(xué)字符識(shí)別。它是利用光學(xué)技術(shù)和計(jì)算機(jī)技術(shù)把印在或?qū)懺诩埳系奈淖肿x取出來,并轉(zhuǎn)換成一種計(jì)算機(jī)能夠接受、人又可以理解的格式。本文將利用OCR實(shí)現(xiàn)文字識(shí)別功能,感興趣的可以了解一下2022-11-11
淺析C#?AsyncLocal如何在異步間進(jìn)行數(shù)據(jù)流轉(zhuǎn)
在異步編程中,處理異步操作之間的數(shù)據(jù)流轉(zhuǎn)是一個(gè)比較常用的操作,C#異步編程提供了一個(gè)強(qiáng)大的工具來解決這個(gè)問題,那就是AsyncLocal,下面我們就來看看AsyncLocal的原理和用法吧2023-08-08

