運(yùn)用示例簡(jiǎn)單講解C#取消令牌CancellationTokenSource
前言
相信大家在使用C#進(jìn)行開發(fā)的時(shí)候,特別是使用異步的場(chǎng)景,多多少少會(huì)接觸到CancellationTokenSource。看名字就知道它和取消異步任務(wù)相關(guān)的,而且一看便知大名鼎鼎的CancellationToken就是它生產(chǎn)出來的。不看不知道,一看嚇一跳。它在取消異步任務(wù)、異步通知等方面效果還是不錯(cuò)的,不僅好用而且夠強(qiáng)大。無論是微軟底層類庫(kù)還是開源項(xiàng)目涉及到Task相關(guān)的,基本上都能看到它的身影,而微軟近幾年也是很重視框架中的異步操作,特別是在.NET Core上基本上能看到Task的地方就能看到CancellationTokenSource的身影。這次我們抱著學(xué)習(xí)的態(tài)度,來揭開它的神秘面紗。
簡(jiǎn)單示例
相信對(duì)于CancellationTokenSource基本的使用,許多同學(xué)已經(jīng)非常熟悉了。不過為了能夠讓大家?guī)胛恼碌墓?jié)奏,我們還是打算先展示幾個(gè)基礎(chǔ)的操作,讓大家找找感覺,回到那個(gè)熟悉的年代。
基礎(chǔ)操作
首先呈現(xiàn)一個(gè)最基礎(chǔ)的操作。
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = cancellationTokenSource.Token;
cancellationToken.Register(() => System.Console.WriteLine("取消了???"));
cancellationToken.Register(() => System.Console.WriteLine("取消了?。?!"));
cancellationToken.Register(state => System.Console.WriteLine($"取消了。。。{state}"),"啊啊啊");
System.Console.WriteLine("做了點(diǎn)別的,然后取消了.");
cancellationTokenSource.Cancel();
這個(gè)操作是最簡(jiǎn)單的操作,我們上面提到過CancellationTokenSource就是用來生產(chǎn)CancellationToken的,還可以說CancellationToken是CancellationTokenSource的表現(xiàn),這個(gè)待會(huì)看源碼的時(shí)候我們會(huì)知道為啥這么說。這里呢我們給CancellationToken注冊(cè)幾個(gè)操作,然后使用CancellationTokenSource的Cancel方法取消操作,這時(shí)候控制臺(tái)就會(huì)打印結(jié)果如下
做了點(diǎn)別的,然后取消了.
取消了。。。啊啊啊
取消了!?。?br /> 取消了???
通過上面簡(jiǎn)單的示例,大家應(yīng)該非常輕松的理解了它的簡(jiǎn)單使用。
定時(shí)取消
有的時(shí)候呢我們可能需要超時(shí)操作,比如我不想一直等著,到了一個(gè)固定的時(shí)間我就要取消操作,這時(shí)候我們可以利用CancellationTokenSource的構(gòu)造函數(shù)給定一個(gè)限定時(shí)間,過了這個(gè)時(shí)間CancellationTokenSource就會(huì)被取消了,操作如下
//設(shè)置3000毫秒(即3秒)后取消
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(3000);
CancellationToken cancellationToken = cancellationTokenSource.Token;
cancellationToken.Register(() => System.Console.WriteLine("我被取消了."));
System.Console.WriteLine("先等五秒鐘.");
await Task.Delay(5000);
System.Console.WriteLine("手動(dòng)取消.")
cancellationTokenSource.Cancel();
然后在控制臺(tái)打印的結(jié)果是這個(gè)樣子的,活脫脫的為我們實(shí)現(xiàn)了內(nèi)建的超時(shí)操作。
先等五秒鐘.
我被取消了.
手動(dòng)取消.
上面的寫法是在構(gòu)造CancellationTokenSource的時(shí)候設(shè)置超時(shí)等待,還有另一種寫法等同于這種寫法,使用的是CancelAfter方法,具體使用如下
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Token.Register(() => System.Console.WriteLine("我被取消了."));
//五秒之后取消
cancellationTokenSource.CancelAfter(5000);
System.Console.WriteLine("不會(huì)阻塞,我會(huì)執(zhí)行.");
這個(gè)操作也是定時(shí)取消操作,需要注意的是CancelAfter方法并不會(huì)阻塞執(zhí)行,所以打印的結(jié)果是
不會(huì)阻塞,我會(huì)執(zhí)行.
我被取消了.
關(guān)聯(lián)取消
還有的時(shí)候是這樣的場(chǎng)景,就是我們?cè)O(shè)置一組關(guān)聯(lián)的CancellationTokenSource,我們期望的是只要這一組里的任意一個(gè)CancellationTokenSource被取消了,那么這個(gè)被關(guān)聯(lián)的CancellationTokenSource就會(huì)被取消。說得通俗一點(diǎn)就是,我們幾個(gè)當(dāng)中只要一個(gè)不在了,那么你也可以不在了,具體的實(shí)現(xiàn)方式是這樣的
//聲明幾個(gè)CancellationTokenSource
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationTokenSource tokenSource2 = new CancellationTokenSource();
CancellationTokenSource tokenSource3 = new CancellationTokenSource();
tokenSource2.Token.Register(() => System.Console.WriteLine("tokenSource2被取消了"));
//創(chuàng)建一個(gè)關(guān)聯(lián)的CancellationTokenSource
CancellationTokenSource tokenSourceNew = CancellationTokenSource.CreateLinkedTokenSource(tokenSource.Token, tokenSource2.Token, tokenSource3.Token);
tokenSourceNew.Token.Register(() => System.Console.WriteLine("tokenSourceNew被取消了"));
//取消tokenSource2
tokenSource2.Cancel();
上述示例中因?yàn)閠okenSourceNew關(guān)聯(lián)了tokenSource、tokenSource2、tokenSource3所以只要他們其中有一個(gè)被取消那么tokenSourceNew也會(huì)被取消,所以上述示例的打印結(jié)果是
tokenSourceNew被取消了
tokenSource2被取消了
判斷取消
上面我們使用的方式,都是通過回調(diào)的方式得知CancellationTokenSource被取消了,沒辦法通過標(biāo)識(shí)去得知CancellationTokenSource是否可用。不過微軟貼心的為我們提供了IsCancellationRequested屬性去判斷,需要注意的是它是CancellationToken的屬性,具體使用方式如下
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = tokenSource.Token;
//打印被取消
cancellationToken.Register(() => System.Console.WriteLine("被取消了."));
//模擬傳遞的場(chǎng)景
Task.Run(async ()=> {
while (!cancellationToken.IsCancellationRequested)
{
System.Console.WriteLine("一直在執(zhí)行...");
await Task.Delay(1000);
}
});
//5s之后取消
tokenSource.CancelAfter(5000);
上述代碼五秒之后CancellationTokenSource被取消,因此CancellationTokenSource的Token也會(huì)被取消。反映到IsCancellationRequested上就是值為true說明被取消,為false說明沒被取消,因此控制臺(tái)輸出的結(jié)果是
一直在執(zhí)行...
一直在執(zhí)行...
一直在執(zhí)行...
一直在執(zhí)行...
一直在執(zhí)行...
被取消了.
還有另一種方式,也可以主動(dòng)判斷任務(wù)是否被取消,不過這種方式簡(jiǎn)單粗暴,直接是拋出了異常。如果是使用異步的方式的話,需要注意的是Task內(nèi)部異常的捕獲方式,否則對(duì)外可能還沒有感知到具體異常的原因,它的使用方式是這樣的,這里為了演示方便我直接換了一種更直接的方式
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = tokenSource.Token;
cancellationToken.Register(() => System.Console.WriteLine("被取消了."));
tokenSource.CancelAfter(5000);
while (true)
{
//如果操作被取消則直接拋出異常
cancellationToken.ThrowIfCancellationRequested();
System.Console.WriteLine("一直在執(zhí)行...");
await Task.Delay(1000);
}
執(zhí)行五秒之后則直接拋出 System.OperationCanceledException: The operation was canceled.異常,異步情況下注意異常處理的方式即可。通過上面這些簡(jiǎn)單的示例,相信大家對(duì)CancellationTokenSource有了一定的認(rèn)識(shí),大概知道了在什么時(shí)候可以使用它,主要是異步取消通知,或者限定時(shí)間操作通知等等。CancellationTokenSource是個(gè)不錯(cuò)的神器,使用簡(jiǎn)單功能強(qiáng)大。
源碼探究
通過上面的示例,相信大家對(duì)CancellationTokenSource有了一個(gè)基本的認(rèn)識(shí),真的是非常強(qiáng)大,而且使用起來也非常的簡(jiǎn)單,這也是c#語言的精妙之處,非常實(shí)用,讓你用起來的時(shí)候非常舒服,有種用著用著就想跪下的沖動(dòng)。步入正題,接下來讓我們來往深處看看CancellationTokenSource的源碼,看看它的工作機(jī)制是啥。本文貼出的源碼是博主精簡(jiǎn)過的,畢竟源碼太多不太可能全部粘貼出來,主要是跟著它的思路了解它的工作方式。
構(gòu)造入手
因?yàn)檫@一次呢CancellationTokenSource的初始化函數(shù)中有一個(gè)比較重要的構(gòu)造函數(shù),那就是可以設(shè)置定時(shí)超時(shí)的操作,那么我們就從它的構(gòu)造函數(shù)入手[點(diǎn)擊查看源碼👈]
//全局狀態(tài)
private volatile int _state;
//未取消狀態(tài)值
private const int NotCanceledState = 1;
/// <summary>
/// 無參構(gòu)造初始化狀態(tài)
/// </summary>
public CancellationTokenSource() => _state = NotCanceledState;
/// <summary>
/// 定時(shí)取消構(gòu)造
/// </summary>
public CancellationTokenSource(TimeSpan delay)
{
//獲取timespan的毫秒數(shù)
long totalMilliseconds = (long)delay.TotalMilliseconds;
if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue)
{
throw new ArgumentOutOfRangeException(nameof(delay));
}
//調(diào)用InitializeWithTimer
InitializeWithTimer((int)totalMilliseconds);
}
public CancellationTokenSource(int millisecondsDelay)
{
if (millisecondsDelay < -1)
{
throw new ArgumentOutOfRangeException(nameof(millisecondsDelay));
}
//調(diào)用InitializeWithTimer
InitializeWithTimer(millisecondsDelay);
}
無參構(gòu)造函數(shù)沒啥好說的,就是給全局state狀態(tài)初始化NotCanceledState的初始值,也就是初始化狀態(tài)。我們比較關(guān)注的是可以定時(shí)取消的構(gòu)造函數(shù),雖然是兩個(gè)構(gòu)造函數(shù),但是殊途同歸,本質(zhì)都是傳遞的毫秒整形參數(shù),而且調(diào)用的核心方法都是InitializeWithTimer,看來是一個(gè)定時(shí)器操作,這樣不奇怪了,我們看下InitializeWithTimer方法的實(shí)現(xiàn)[點(diǎn)擊查看源碼👈]
//任務(wù)完成狀態(tài)值
private const int NotifyingCompleteState = 2;
//定時(shí)器
private volatile TimerQueueTimer? _timer;
//定時(shí)器回調(diào)初始化
private static readonly TimerCallback s_timerCallback = TimerCallback;
//定時(shí)器回調(diào)委托本質(zhì)是調(diào)用的CancellationTokenSource的NotifyCancellation方法
private static void TimerCallback(object? state) =>
((CancellationTokenSource)state!).NotifyCancellation(throwOnFirstException: false);
private void InitializeWithTimer(uint millisecondsDelay)
{
if (millisecondsDelay == 0)
{
//如果定時(shí)的毫秒為0,則設(shè)置全局狀態(tài)為NotifyingCompleteState
_state = NotifyingCompleteState;
}
else
{
//如果超時(shí)毫秒不為0則初始化定時(shí)器,并設(shè)置定時(shí)器定時(shí)的回調(diào)
_timer = new TimerQueueTimer(s_timerCallback, this, millisecondsDelay, Timeout.UnsignedInfinite, flowExecutionContext: false);
}
}
通過這個(gè)方法,我們可以非常清晰的看到定時(shí)初始化的核心操作其實(shí)就是初始化一個(gè)定時(shí)器,而定時(shí)的時(shí)間就是我們初始化傳遞的毫秒數(shù),其中s_timerCallback是定時(shí)的回調(diào)函數(shù),即如果等待超時(shí)之后則調(diào)用這個(gè)委托,其本質(zhì)正是CancellationTokenSource的NotifyCancellation方法,這個(gè)方法正是處理超時(shí)之后的操作[點(diǎn)擊查看源碼👈]
//信號(hào)控制類,通過信號(hào)判斷是否需要繼續(xù)執(zhí)行或阻塞
private volatile ManualResetEvent? _kernelEvent;
//throwOnFirstException函數(shù)是指示如果被取消了是否拋出異常
private void NotifyCancellation(bool throwOnFirstException)
{
//如果任務(wù)已經(jīng)取消則直接直接釋放定時(shí)器
if (!IsCancellationRequested && Interlocked.CompareExchange(ref _state, NotifyingState, NotCanceledState) == NotCanceledState)
{
TimerQueueTimer? timer = _timer;
if (timer != null)
{
_timer = null;
timer.Close();
}
//信號(hào)量涉及到了一個(gè)重要的屬性WaitHandle接下來會(huì)說
_kernelEvent?.Set();
//執(zhí)行取消操作,是取消操作的核心,講取消操作的時(shí)候咱們會(huì)著重說這個(gè)
ExecuteCallbackHandlers(throwOnFirstException);
Debug.Assert(IsCancellationCompleted, "Expected cancellation to have finished");
}
}
NotifyCancellation正是處理定時(shí)器到時(shí)的操作,說白了就是到了指定的時(shí)間但是沒有手動(dòng)取消執(zhí)行的操作,其實(shí)也是執(zhí)行的取消操作,這個(gè)方法里涉及到了兩個(gè)比較重要的點(diǎn),也是接下來我們會(huì)分析的點(diǎn),這里做一下說明
- 首先是ManualResetEvent這個(gè)實(shí)例,這個(gè)類的功能是通過信號(hào)機(jī)制控制是否阻塞或執(zhí)行后續(xù)操作,與之相輔的還有另一個(gè)類AutoResetEvent。這兩個(gè)類實(shí)現(xiàn)的效果是一致的,只是ManualResetEvent需要手動(dòng)重置初始狀態(tài),而AutoResetEvent則會(huì)自動(dòng)重置。有關(guān)兩個(gè)類的說明,這里不做過多介紹,有需要了解的同學(xué)們可以自行百度。而CancellationTokenSource類的一個(gè)重要屬性WaitHandle正是使用的它。
- 還有一個(gè)是ExecuteCallbackHandlers方法,這個(gè)是CancellationTokenSource執(zhí)行取消操作的核心操作。為了保證閱讀的順序性,咱們?cè)谥v取消操作的時(shí)候在重點(diǎn)講這個(gè)方法。
上面提到了,為了保證閱讀的順序性方便理解,咱們?cè)诒疚慕酉聛頃?huì)講解這兩部分,就不再初始化這里講解了,這里做一下標(biāo)記,以防大家覺得沒講清楚就繼續(xù)了。
小插曲WaitHandle
上面我們提到了CancellationTokenSource的WaitHandle屬性,它是基于ManualResetEvent實(shí)現(xiàn)的。這個(gè)算是一個(gè)稍微獨(dú)立的地方,我們可以先進(jìn)行講解一下[點(diǎn)擊查看源碼👈]
private volatile ManualResetEvent? _kernelEvent;
internal WaitHandle WaitHandle
{
get
{
ThrowIfDisposed();
//如果初始化過了則直接返回
if (_kernelEvent != null)
{
return _kernelEvent;
}
//初始化一個(gè)ManualResetEvent,給定初始值為false
var mre = new ManualResetEvent(false);
//線程安全操作如果有別的線程初始了則釋放上面初始化的操作
if (Interlocked.CompareExchange(ref _kernelEvent, mre, null) != null)
{
mre.Dispose();
}
//如果任務(wù)已取消則后續(xù)操作不阻塞
if (IsCancellationRequested)
{
_kernelEvent.Set();
}
return _kernelEvent;
}
}
通過這段代碼我們可以看到,如果使用了WaitHandle屬性則可以使用它實(shí)現(xiàn)簡(jiǎn)單的阻塞通知操作,也就是收到取消通知操作之后我們可以執(zhí)行WaitHandle之后的操作,但是WaitHandle是internal修飾的,我們?cè)撛趺词褂媚兀磕?,我們知道CancellationTokenSource的Token屬性獲取的是CancellationToken實(shí)例[點(diǎn)擊查看源碼👈]
public CancellationToken Token
{
get
{
ThrowIfDisposed();
return new CancellationToken(this);
}
}
直接實(shí)例化了一個(gè)CancellationToken實(shí)例返回去了,并傳遞了當(dāng)前CancellationTokenSource實(shí)例,找到CancellationToken的這個(gè)構(gòu)造函數(shù)[點(diǎn)擊查看源碼👈]
private readonly CancellationTokenSource? _source; internal CancellationToken(CancellationTokenSource? source) => _source = source; public WaitHandle WaitHandle => (_source ?? CancellationTokenSource.s_neverCanceledSource).WaitHandle;
通過上面的代碼我們可以看到通過CancellationToken實(shí)例便可以使用WaitHandle屬性,實(shí)現(xiàn)我們?cè)L問到它的效果,光是說的話可能有點(diǎn)迷糊,通過一個(gè)簡(jiǎn)單的示例我們來了解WaitHandle的使用方式,簡(jiǎn)單來看下
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = tokenSource.Token;
cancellationToken.Register(() => System.Console.WriteLine("被取消了."));
tokenSource.CancelAfter(5000);
Task.Run(()=> {
System.Console.WriteLine("阻塞之前");
cancellationToken.WaitHandle.WaitOne();
System.Console.WriteLine("阻塞取消,執(zhí)行到了.");
});
System.Console.WriteLine("執(zhí)行到了這里");
在CancellationTokenSource為被取消之前WaitHandle.WaitOne()方法會(huì)阻塞后續(xù)執(zhí)行,也就是下面的輸出暫時(shí)不會(huì)輸出。等到CancellationTokenSource執(zhí)行了Cancel操作里調(diào)用了ManualResetEvent的Set方法停止阻塞,后續(xù)的輸出才會(huì)被執(zhí)行到這是一個(gè)同步操作,如果了解ManualResetEvent的同學(xué)相信對(duì)這個(gè)不難理解。為了演示效果我用Task演示異步的情況,所以執(zhí)行的結(jié)果如下所示
執(zhí)行到了這里
阻塞之前
阻塞取消,執(zhí)行到了.
被取消了.
注冊(cè)操作
上面我們大概講解了一些初始化相關(guān)的和一些輔助的操作,接下來我們看一下核心的注冊(cè)操作,注冊(cè)操作的用途就是注冊(cè)CancellationTokenSource取消或超時(shí)后需要執(zhí)行的動(dòng)作,而注冊(cè)Register的操作并未由CancellationTokenSource直接進(jìn)行,而是通過它的Token屬性即CancellationToken實(shí)例操作的,話不多說直接找到CancellationToken的Register方法[點(diǎn)擊查看源碼👈]
public CancellationTokenRegistration Register(Action callback) =>
Register(
s_actionToActionObjShunt,
callback ?? throw new ArgumentNullException(nameof(callback)),
useSynchronizationContext: false,
useExecutionContext: true);
它是直接調(diào)用自己的重載方法,注意幾個(gè)參數(shù),如果看細(xì)節(jié)的話還是要關(guān)注方法參數(shù)的。過程就省略了,直接找到最底層的方法[點(diǎn)擊查看源碼👈]
private CancellationTokenRegistration Register(Action<object?> callback, object? state, bool useSynchronizationContext, bool useExecutionContext)
{
if (callback == null)
throw new ArgumentNullException(nameof(callback));
//_source就是傳遞下來的CancellationTokenSource
CancellationTokenSource? source = _source;
//本質(zhì)是調(diào)用的CancellationTokenSource的InternalRegister方法
return source != null ?
source.InternalRegister(callback, state, useSynchronizationContext ? SynchronizationContext.Current : null, useExecutionContext ? ExecutionContext.Capture() : null) :
default;
從這個(gè)最底層的方法我們可以得知,其本質(zhì)還是調(diào)用CancellationTokenSource的InternalRegister方法,核心操作都不在CancellationToken還是在CancellationTokenSource類,CancellationToken更像是依賴CancellationTokenSource的表現(xiàn)類,看一下InternalRegister方法[點(diǎn)擊查看源碼👈]
//初始化CallbackPartition數(shù)組
private volatile CallbackPartition?[]? _callbackPartitions;
//獲取初始化上面數(shù)組的長(zhǎng)度,根據(jù)當(dāng)前CPU核心數(shù)獲取的
private static readonly int s_numPartitions = GetPartitionCount();
internal CancellationTokenRegistration InternalRegister(
Action<object?> callback, object? stateForCallback, SynchronizationContext? syncContext, ExecutionContext? executionContext)
{
//判斷有沒有被取消
if (!IsCancellationRequested)
{
//如果已被釋放直接返回
if (_disposed)
{
return default;
}
CallbackPartition?[]? partitions = _callbackPartitions;
if (partitions == null)
{
//首次調(diào)用初始化CallbackPartition數(shù)組
partitions = new CallbackPartition[s_numPartitions];
//判斷_callbackPartitions如果為null,則把partitions賦值給_callbackPartitions
partitions = Interlocked.CompareExchange(ref _callbackPartitions, partitions, null) ?? partitions;
}
//獲取當(dāng)前線程使用的分區(qū)下標(biāo)
int partitionIndex = Environment.CurrentManagedThreadId & s_numPartitionsMask;
//獲取一個(gè)CallbackPartition
CallbackPartition? partition = partitions[partitionIndex];
if (partition == null)
{
//初始化CallbackPartition實(shí)例
partition = new CallbackPartition(this);
//如果partitions的partitionIndex下標(biāo)位置為null則使用partition填充
partition = Interlocked.CompareExchange(ref partitions[partitionIndex], partition, null) ?? partition;
}
long id;
CallbackNode? node;
bool lockTaken = false;
//鎖住操作
partition.Lock.Enter(ref lockTaken);
try
{
id = partition.NextAvailableId++;
//獲取CallbackNode,這事真正存儲(chǔ)回調(diào)的地方,不要被List名字迷惑,其實(shí)是要構(gòu)建鏈表
node = partition.FreeNodeList;
if (node != null)
{
//這個(gè)比較有意思如果CallbackNode不是首次,則把最新的賦值給FreeNodeList
partition.FreeNodeList = node.Next;
}
else
{
//首次的時(shí)候初始化一個(gè)CallbackNode實(shí)例
node = new CallbackNode(partition);
}
node.Id = id;
//Register的回調(diào)操作賦值給了CallbackNode的Callback
node.Callback = callback;
node.CallbackState = stateForCallback;
node.ExecutionContext = executionContext;
node.SynchronizationContext = syncContext;
//構(gòu)建一個(gè)CallbackNode鏈表,從下面的代碼可以看出來構(gòu)建的其實(shí)是倒序鏈表,最新的CallbackNode是表頭
node.Next = partition.Callbacks;
if (node.Next != null)
{
node.Next.Prev = node;
}
//Callbacks記錄的是當(dāng)前的節(jié)點(diǎn),如果下一次進(jìn)來新節(jié)點(diǎn)則作為新節(jié)點(diǎn)的Next節(jié)點(diǎn)
partition.Callbacks = node;
}
finally
{
//釋放鎖
partition.Lock.Exit(useMemoryBarrier: false);
}
//用當(dāng)前注冊(cè)回調(diào)生成的CallbackNode節(jié)點(diǎn)生成CancellationTokenRegistration實(shí)例
var ctr = new CancellationTokenRegistration(id, node);
//如果未被取消則直接返回
if (!IsCancellationRequested || !partition.Unregister(id, node))
{
return ctr;
}
}
//走到這里說明IsCancellationRequested已經(jīng)等于true了也就是被取消了,則直接執(zhí)行該回調(diào)
callback(stateForCallback);
return default;
}
這里涉及到一個(gè)比較核心的類那就是CallbackPartition,這是一個(gè)內(nèi)部類,它的主要用途就是輔助構(gòu)建執(zhí)行回調(diào)的鏈表操作,其大概實(shí)現(xiàn)是這個(gè)樣子的[點(diǎn)擊查看源碼👈]
internal sealed class CallbackPartition
{
public readonly CancellationTokenSource Source;
//使用了自旋鎖
public SpinLock Lock = new SpinLock(enableThreadOwnerTracking: false);
public CallbackNode? Callbacks;
public CallbackNode? FreeNodeList;
public long NextAvailableId = 1;
public CallbackPartition(CancellationTokenSource source)
{
Source = source;
}
internal bool Unregister(long id, CallbackNode node)
{
//這里面有內(nèi)容,就不羅列了,判斷CallbackNode是否被取消注冊(cè),如果為false說明未被取消注冊(cè)
}
}
這里面我暫時(shí)沒有列出Unregister的內(nèi)容,因?yàn)樗呛腿∠嚓P(guān)的,說到取消的時(shí)候咱們?cè)倏?,如果返回true則說明取消成功。這個(gè)類核心就是輔助構(gòu)建Register回調(diào)鏈表的,它的核心都是在操作CallbackNode節(jié)點(diǎn)和其構(gòu)建的回調(diào)鏈表,而CallbackNode則是鏈表的一個(gè)節(jié)點(diǎn)定義,其大致結(jié)構(gòu)如下[點(diǎn)擊查看源碼👈]
internal sealed class CallbackNode
{
public readonly CallbackPartition Partition;
//構(gòu)建鏈表的核心Prev和Next
public CallbackNode? Prev;
public CallbackNode? Next;
public long Id;
//回調(diào)操作被這個(gè)委托記錄
public Action<object?>? Callback;
public object? CallbackState;
public ExecutionContext? ExecutionContext;
public SynchronizationContext? SynchronizationContext;
public CallbackNode(CallbackPartition partition)
{
Partition = partition;
}
public void ExecuteCallback()
{
//這里也有代碼,暫時(shí)不列出來,講取消的時(shí)候單獨(dú)講解
}
}
到了這里關(guān)于Register涉及到的核心操作都羅列出來了,由于貼出來的是源碼相關(guān)看著是比較蒙圈的,但是如果順著看的話其實(shí)還是大致的實(shí)現(xiàn)思路還是可以理解的,這里我大致的總結(jié)一下它的實(shí)現(xiàn)思路
- 首先是構(gòu)建了CallbackPartition數(shù)組,構(gòu)建這個(gè)數(shù)組的長(zhǎng)度是根據(jù)CPU的核心數(shù)來決定,每個(gè)CallbackPartition是操作的核心,為了防止過多的線程同時(shí)操作一個(gè)CallbackPartition實(shí)例,它采用了為不同線程分區(qū)的思路,CallbackPartition維護(hù)了構(gòu)建鏈表節(jié)點(diǎn)的類CallbackNode。
- CallbackNode是組成鏈表的核心,CallbackNode每個(gè)實(shí)例都是鏈表的一個(gè)節(jié)點(diǎn),從它自包含Prev和Next屬性便可以看出是一個(gè)雙向鏈表。
- CallbackPartition的核心功能就是為了構(gòu)建Register進(jìn)來的回調(diào),從上面的InternalRegister方法里的操作我們可以得知,通過CallbackPartition的輔助將CallbackNode節(jié)點(diǎn)構(gòu)建為一個(gè)倒序鏈表,也就是最新的CallbackNode實(shí)例是鏈表的首節(jié)點(diǎn),而最老的CallbackNode實(shí)例則是鏈表的尾節(jié)點(diǎn)。每一次Register進(jìn)來的回調(diào),都被包裝成了CallbackNode添加到這個(gè)鏈表中。
上面InternalRegister方法里我們看到操作CallbackNode的時(shí)候,使用了SpinLock自旋鎖。短時(shí)間鎖定的情況下SpinLock更快,因?yàn)樽孕i本質(zhì)上不會(huì)讓線程休眠,而是一直循環(huán)嘗試對(duì)資源訪問,直到可用。所以自旋鎖線程被阻塞時(shí),不進(jìn)行線程上下文切換,而是空轉(zhuǎn)等待。對(duì)于多核CPU而言,減少了切換線程上下文的開銷,從而提高了性能。
取消操作
上面我們看到了注冊(cè)相關(guān)的操作,注冊(cè)還是比較統(tǒng)一的,就一種操作方式。取消卻有兩種方式,一種是超時(shí)取消,另一種是主動(dòng)取消,接下來我們就分別看一下這兩種方式分別是如何操作的。
Cancel操作
首先我們來看主動(dòng)取消的操作方式這個(gè)是最簡(jiǎn)單最直接的方式,而且這個(gè)方法屬于CancellationTokenSource類,話不多說直接看實(shí)現(xiàn)[點(diǎn)擊查看源碼👈]
public void Cancel() => Cancel(false);
public void Cancel(bool throwOnFirstException)
{
ThrowIfDisposed();
NotifyCancellation(throwOnFirstException);
}
重點(diǎn)來了Cancel方法居然也是調(diào)用的NotifyCancellation方法,這個(gè)方法咱們上面已經(jīng)看過了。在說定時(shí)的方式構(gòu)造CancellationTokenSource的時(shí)候有一個(gè)自動(dòng)取消的操作,提到了NotifyCancellation方法的核心是ExecuteCallbackHandlers方法,這個(gè)是CancellationTokenSource執(zhí)行取消操作的核心操作。還說了為了保證閱讀的順序性,咱們?cè)谥v取消操作的時(shí)候在重點(diǎn)講這個(gè)方法。看來這個(gè)時(shí)刻終于還是到來了,直接打開ExecuteCallbackHandlers方法[點(diǎn)擊查看源碼👈]
private volatile int _threadIDExecutingCallbacks = -1;
private volatile CallbackPartition?[]? _callbackPartitions;
private const int NotifyingCompleteState = 3;
private void ExecuteCallbackHandlers(bool throwOnFirstException)
{
//獲取當(dāng)前線程ID
ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId;
//將_callbackPartitions置為null,但是partitions不為null,因?yàn)镋xchange返回的是改變之前的值
CallbackPartition?[]? partitions = Interlocked.Exchange(ref _callbackPartitions, null);
//如果partitions為null說明是回調(diào)已經(jīng)通知完成狀態(tài)了直接返回
if (partitions == null)
{
Interlocked.Exchange(ref _state, NotifyingCompleteState);
return;
}
List<Exception>? exceptionList = null;
try
{
//遍歷CallbackPartition數(shù)組
foreach (CallbackPartition? partition in partitions)
{
//CallbackPartition實(shí)例為null說明這個(gè)分區(qū)未被使用直接跳過
if (partition == null)
{
continue;
}
//循環(huán)處理CallbackNode鏈表
while (true)
{
CallbackNode? node;
bool lockTaken = false;
//鎖住當(dāng)前操作
partition.Lock.Enter(ref lockTaken);
try
{
//獲取鏈表的節(jié)點(diǎn)
node = partition.Callbacks;
//為null說明沒Register過直接中斷
if (node == null)
{
break;
}
else
{
//如果鏈表遍歷不是尾節(jié)點(diǎn),切斷和下一個(gè)節(jié)點(diǎn)的關(guān)聯(lián)
if (node.Next != null) node.Next.Prev = null;
//把下一個(gè)節(jié)點(diǎn)賦值給Callbacks
partition.Callbacks = node.Next;
}
//當(dāng)前執(zhí)行節(jié)點(diǎn)ID
_executingCallbackId = node.Id;
node.Id = 0;
}
finally
{
//退出鎖
partition.Lock.Exit(useMemoryBarrier: false);
}
try
{
//如果當(dāng)時(shí)傳遞了同步上下文則直接在當(dāng)時(shí)的上下文調(diào)用ExecuteCallback委托
if (node.SynchronizationContext != null)
{
node.SynchronizationContext.Send(static s =>
{
var n = (CallbackNode)s!;
n.Partition.Source.ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId;
n.ExecuteCallback();
}, node);
ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId;
}
else
{
//如果沒有傳遞SynchronizationContext則直接調(diào)用ExecuteCallback委托
//即調(diào)用Register的注冊(cè)的委托
node.ExecuteCallback();
}
}
catch (Exception ex) when (!throwOnFirstException)
{
(exceptionList ??= new List<Exception>()).Add(ex);
}
}
}
}
finally
{
//將全局狀態(tài)置為通知完成狀態(tài)
//即已經(jīng)調(diào)用過Register回調(diào)
_state = NotifyingCompleteState;
Volatile.Write(ref _executingCallbackId, 0);
Interlocked.MemoryBarrier();
}
//如果中途存在異常則拋出
if (exceptionList != null)
{
Debug.Assert(exceptionList.Count > 0, $"Expected {exceptionList.Count} > 0");
throw new AggregateException(exceptionList);
}
}
關(guān)于ExecuteCallback方法是CallbackNode類的方法,也就是咱們上面羅列CallbackNode類結(jié)構(gòu)時(shí)被省略的方法,它的主要功能就是調(diào)用Register的回調(diào),也就是執(zhí)行Register里的委托。欠下的我會(huì)補(bǔ)上來,注意這里是CallbackNode類,接下來看下實(shí)現(xiàn)[點(diǎn)擊查看源碼👈]
public ExecutionContext? ExecutionContext;
public void ExecuteCallback()
{
ExecutionContext? context = ExecutionContext;
//如果Register的時(shí)候允許傳遞ExecutionContext則直接用這個(gè)上下文執(zhí)行回調(diào)Callback
//Callback委托也就是承載Register的委托操作
if (context != null)
{
ExecutionContext.RunInternal(context, static s =>
{
Debug.Assert(s is CallbackNode, $"Expected {typeof(CallbackNode)}, got {s}");
CallbackNode n = (CallbackNode)s;
Debug.Assert(n.Callback != null);
n.Callback(n.CallbackState);
}, this);
}
else
{
Debug.Assert(Callback != null);
//直接在當(dāng)前線程調(diào)用Callback
//Callback委托也就是承載Register的委托操作
Callback(CallbackState);
}
}
關(guān)于取消的核心方法ExecuteCallbackHandlers的重要操作,咱們已經(jīng)羅列出來了,其實(shí)我們看到注冊(cè)的思路的時(shí)候,就已經(jīng)能猜到執(zhí)行取消回調(diào)的大致思路了,既然Register的時(shí)候進(jìn)行了拉鏈,那么取消執(zhí)行注冊(cè)回調(diào)肯定是變量鏈表執(zhí)行里面的Callback了,大致總結(jié)一下
- 執(zhí)行Cancel之后核心操作還是針對(duì)構(gòu)建的CallbackNode鏈表進(jìn)行遍歷,咱們之前說過構(gòu)建的CallbackNode鏈表是倒序鏈表,最新的節(jié)點(diǎn)放在鏈表的首部,這也就解釋了為啥我們上面的示例Register多個(gè)委托的時(shí)候,最先輸出的是最后注冊(cè)委托。
- Register注冊(cè)時(shí)候有參數(shù)判斷是否需要傳遞當(dāng)前同步上下文SynchronizationContext和執(zhí)行上下文ExecutionContext,作用就是為了是否在當(dāng)時(shí)的上下文環(huán)境執(zhí)行Callback回調(diào)操作。
- 上面的遍歷代碼我們看到了會(huì)執(zhí)行CallbackNode.Next.Prev=null的操作,是為了斷開當(dāng)前鏈表節(jié)點(diǎn)和上下節(jié)點(diǎn)的關(guān)系,個(gè)人感覺是為了切斷對(duì)象引用方便釋放的,防止內(nèi)存泄漏,同時(shí)也說明了默認(rèn)情況下Register的的回調(diào)函數(shù)執(zhí)行是一次性的,當(dāng)執(zhí)行完Cancel操作之后當(dāng)前CancellationToken實(shí)例也就失效了。
CancelAfter操作
之前我們演示的時(shí)候說過有兩種方式可以執(zhí)行超時(shí)取消操作,一種是在構(gòu)建CancellationTokenSource實(shí)例構(gòu)造的時(shí)候傳遞超時(shí)時(shí)間,還有另一種是使用CancelAfter操作,這個(gè)方法表示在指定時(shí)間之后取消,效果上等同于實(shí)例化CancellationTokenSource的時(shí)候傳遞超時(shí)時(shí)間的操作,廢話不多說直接羅列代碼[點(diǎn)擊查看源碼👈]
public void CancelAfter(TimeSpan delay)
{
long totalMilliseconds = (long)delay.TotalMilliseconds;
if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue)
{
throw new ArgumentOutOfRangeException(nameof(delay));
}
//調(diào)用的是重載的CancelAfter方法
CancelAfter((int)totalMilliseconds);
}
private static readonly TimerCallback s_timerCallback = obj =>
{
((CancellationTokenSource)obj).NotifyCancellation(throwOnFirstException: false);
};
public void CancelAfter(int millisecondsDelay)
{
//傳遞的毫秒數(shù)不能小于-1
if (millisecondsDelay < -1)
{
throw new ArgumentOutOfRangeException(nameof(millisecondsDelay));
}
//如果已經(jīng)取消則直接返回
if (IsCancellationRequested)
{
return;
}
//注冊(cè)一個(gè)定時(shí)器執(zhí)行s_timerCallback
//s_timerCallback在上面我們介紹過了 本這就是調(diào)用CancellationTokenSource的NotifyCancellation方法
TimerQueueTimer? timer = _timer;
if (timer == null)
{
timer = new TimerQueueTimer(s_timerCallback, this, Timeout.UnsignedInfinite, Timeout.UnsignedInfinite, flowExecutionContext: false);
TimerQueueTimer? currentTimer = Interlocked.CompareExchange(ref _timer, timer, null);
if (currentTimer != null)
{
timer.Close();
timer = currentTimer;
}
}
try
{
timer.Change((uint)millisecondsDelay, Timeout.UnsignedInfinite);
}
catch (ObjectDisposedException)
{
}
}
通過上面的源碼我們可以看到CancelAfter的操作代碼和傳遞超時(shí)時(shí)間構(gòu)造CancellationTokenSource的代碼基本上是一致的,都是通過TimerQueueTimer的方式定時(shí)觸發(fā)調(diào)用CancellationTokenSource的NotifyCancellation方法,而NotifyCancellation方法的核心實(shí)現(xiàn)就是ExecuteCallbackHandlers方法,這些方法咱們上面都有講解過,就不重復(fù)介紹了,這樣關(guān)于取消相關(guān)的操作我們也就全部講解完成了。
總結(jié)
本文我們主要講解了C#取消令牌CancellationTokenSource,雖然設(shè)計(jì)到的類并不多,但是這部分源碼并不少,而且也只是講解核心功能的部分源碼,有興趣的同學(xué)可以自行閱讀這個(gè)類相關(guān)代碼,如果你覺得你的GitHub比較不給力推薦一個(gè)可以閱讀CoreCLR源碼的網(wǎng)站source.dot.net這個(gè)網(wǎng)站看到的是目前CoreCLR最新的源碼,可以直接連接到GitHub非常方便,但是最新版本的源碼和穩(wěn)定版本的有些差別,這個(gè)還需要注意。由于文章比較長(zhǎng),再加上筆者技術(shù)能力和文筆能力都有限,這里做一下簡(jiǎn)單的總結(jié)
- CancellationTokenSource的用途就是可以感知到取消操作,其中涉及到的Register回調(diào)、WaitHandle、IsCancellationRequested都能實(shí)現(xiàn)這個(gè)功能,當(dāng)然它還支持超時(shí)取消操作。
- CancellationTokenSource的Register和Cancel相關(guān)成雙成對(duì)的,雖然有CancelAfter和構(gòu)造傳遞超時(shí)時(shí)間的方式,其本質(zhì)和Cancel操作是一樣的。
- CancellationTokenSource的核心操作原理,是通過CallbackPartition和CallbackNode構(gòu)建倒序鏈表,Register的時(shí)候通過Callback委托構(gòu)建鏈表,Cancel的時(shí)候遍歷構(gòu)建的鏈表執(zhí)行Callback,雖然有一堆額外操作,但是核心工作方式就是鏈表操作。
- 需要注意的是,默認(rèn)情況下CancellationTokenSource產(chǎn)生的CancellationToken是一次性的,取消了之后是沒有辦法進(jìn)行重置的,當(dāng)然微軟已經(jīng)為我們提供了IChangeToken去解決了CancellationToken重復(fù)觸發(fā)的問題,請(qǐng)放心使用。
由于本篇文章篇幅較長(zhǎng),加上筆者能力有限,文筆更是一般,如果講解的不清楚還望諒解,或者感興趣的同學(xué)可以自行閱讀源碼。關(guān)于看源碼每個(gè)人都有自己的關(guān)注點(diǎn),我一般的初衷都是弄明白它的原理,順便學(xué)習(xí)下它代碼風(fēng)格或思路。學(xué)無止境,結(jié)果有時(shí)候并不那么重要,過程才重要。就和許多人追求自己能有到達(dá)什么樣的高度,成功其實(shí)只是成長(zhǎng)過程中順便的一種表現(xiàn),就和你如果不滿現(xiàn)狀,說明你在很早之前沒想過改變自己一樣。
到此這篇關(guān)于運(yùn)用示例簡(jiǎn)單講解C#取消令牌CancellationTokenSource的文章就介紹到這了,更多相關(guān)C# CancellationTokenSource內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C#使用dynamic一行代碼實(shí)現(xiàn)反射操作
dynamic的出現(xiàn)讓C#具有了弱語言類型的特性。編譯器在編譯的時(shí)候不再對(duì)類型進(jìn)行檢查,編譯時(shí)默認(rèn)dynamic對(duì)象支持你想要的任何特性,這篇文章主要介紹了C#用dynamic一行代碼實(shí)現(xiàn)反射操作,需要的朋友可以參考下2023-04-04
C#比較二個(gè)數(shù)組并找出相同或不同元素的方法
這篇文章主要介紹了C#比較二個(gè)數(shù)組并找出相同或不同元素的方法,涉及C#針對(duì)數(shù)組的交集、補(bǔ)集等集合操作相關(guān)技巧,非常簡(jiǎn)單實(shí)用,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-11-11
C#實(shí)現(xiàn)獲取枚舉中元素個(gè)數(shù)的方法
這篇文章主要介紹了C#實(shí)現(xiàn)獲取枚舉中元素個(gè)數(shù)的方法,是深入理解C#程序設(shè)計(jì)所需要掌握的基本技巧,需要的朋友可以參考下2014-08-08
在C#中基于Semantic?Kernel的檢索增強(qiáng)生成(RAG)實(shí)踐記錄
SemanticKernel是一個(gè)用于集成和操作大語言模型的應(yīng)用程序框架,支持C#、Python和Java等多種編程語言,通過SemanticKernel,開發(fā)者可以輕松構(gòu)建基于最新AI技術(shù)的應(yīng)用程序2024-10-10

