C# 線程安全詳解
介紹
在 .NET4.0 之前,如果我們需要在多線程環(huán)境下使用 Dictionary 類,除了自己實現(xiàn)線程同步來保證線程安全外,我們沒有其他選擇。很多開發(fā)人員肯定都實現(xiàn)過類似的線程安全方案,可能是通過創(chuàng)建全新的線程安全字典,或者僅是簡單的用一個類封裝一個 Dictionary 對象,并在所有方法中加上鎖機制,我們稱這種方案叫 “Dictionary+Locks” 。
System.Collections.Concurrent 命名空間下提供多個線程安全集合類,只要多個線程同時訪問集合,就應使用這些類來代替 System.Collections 和 System.Collections.Generic 命名空間中的相應類型。 但是,不保證通過擴展方法或通過顯式接口實現(xiàn)訪問集合對象是線程安全的,可能需要由調(diào)用方進行同步。
經(jīng)典生產(chǎn)消費問題
介紹
這個問題是最為經(jīng)典的多線程應用問題問題就是:有一個或多個線程(生產(chǎn)者線程)產(chǎn)生一些數(shù)據(jù),還有一個或者多個線程(消費者線程)要取出這些數(shù)據(jù)并執(zhí)行一些相應的工作。

Queue
接下來,我們是使用程序去描述這個問題,看下面代碼
static void Main(string[] args)
{
int count = 0;
// 臨界資源區(qū)
var queue = new Queue<string>();
// 生產(chǎn)者線程
Task.Factory.StartNew(() =>
{
while (true)
{
queue.Enqueue("mesg" + count);
count++;
}
});
// 消費者線程1
Task.Factory.StartNew(() =>
{
while (true)
{
if (queue.Count > 0)
{
string value = queue.Dequeue();
Console.WriteLine("Worker A: " + value);
}
}
});
// 消費者線程2
Task.Factory.StartNew(() =>
{
while (true)
{
if (queue.Count > 0)
{
string value = queue.Dequeue();
Console.WriteLine("Worker B: " + value);
}
}
});
Thread.Sleep(50000);
}
我們使用 Queue 模擬了一個簡單的資源池,一個生產(chǎn)者放數(shù)據(jù),兩個消費者消費數(shù)據(jù)。
這個程序運行以后會產(chǎn)生異常,異常的原因很簡單。當某時刻,第一個消費者判斷 queue.Count > 0 為true 時,就會到 Queue 中取數(shù)據(jù)。但是,此時這個數(shù)據(jù)可能會被第二個消費者拿走了,因為第二個消費者也判斷出此時有數(shù)據(jù)可取。第一個消費者取取數(shù)據(jù)時就會發(fā)生異常,這就是一個簡單的臨界資源線程安全問題。
知道問題了,那么如何解決呢?有兩種方案,接下來進行講解
ConcurrentQueue
1 . 加鎖
這個方案是可行的,很多時候我們也是這么做的,包括微軟早期實現(xiàn)線程安全的 ArrayList 和 Hashtable 內(nèi)部 (Synchronized方法) 也是這么實現(xiàn)的。這個方案適用于只有少量的消費者,并且每個消費者都會執(zhí)行大量操作的時候,這時 lock 并沒什么太大問題,但是,如果是大批量短小精悍的消費者存在的話,lock 會嚴重影響代碼的執(zhí)行效率。
2 . 線程安全的集合區(qū)
這個就是 .NET4.0 后 System.Collections.Concurrent 命名空間下提供多個線程安全集合類方案。
新的線程安全的這些集合內(nèi)部不再使用lock機制這種比較低效的方式去實現(xiàn)線程安全,而是轉(zhuǎn)而使用SpinWait 和 Interlocked 等機制,間接實現(xiàn)了線程安全,這種方式的效率要高于使用lock的方式。
var queue = new ConcurrentQueue<string>();
Task.Factory.StartNew(() =>
{
while (true)
{
queue.Enqueue("msg" + count);
count++;
}
});
Task.Factory.StartNew(() =>
{
while (true)
{
string value;
if (queue.TryDequeue(out value))
{
Console.WriteLine("Worker A: " + value);
}
}
});
Task.Factory.StartNew(() =>
{
while (true)
{
string value;
if (queue.TryDequeue(out value))
{
Console.WriteLine("Worker B: " + value);
}
}
});
ConcurrentQueue.TryDequeue(T) 方法會嘗試獲取消費,那能不能不要去判斷集合是否為空,集合當自己沒有元素的時候自己 Block 一下可以嗎?答案是,可以的
BlockingCollection
針對上面的問題,我們可以使用 BlockingCollection 即可。接下來我來看
var blockingCollection = new BlockingCollection<string>();
Task.Factory.StartNew(() =>
{
while (true)
{
blockingCollection.Add("msg" + count);
count++;
}
});
Task.Factory.StartNew(() =>
{
while (true)
{
Console.WriteLine("Worker A: " + blockingCollection.Take());
}
});
Task.Factory.StartNew(() =>
{
while (true)
{
Console.WriteLine("Worker B: " + blockingCollection.Take());
}
});
BlockingCollection 集合是一個擁有阻塞功能的集合,它就是完成了經(jīng)典生產(chǎn)者消費者的算法功能。它沒有實現(xiàn)底層的存儲結(jié)構(gòu),而是使用了實現(xiàn) IProducerConsumerCollection 接口的幾個集合作為底層的數(shù)據(jù)結(jié)構(gòu),例如 ConcurrentBag, ConcurrentStack 或者是 ConcurrentQueue。你可以在構(gòu)造BlockingCollection 實例的時候傳入這個參數(shù),如果不指定的話,則默認使用 ConcurrentQueue 作為存儲結(jié)構(gòu)。
而對于生產(chǎn)者來說,只需要通過調(diào)用其Add方法放數(shù)據(jù),消費者只需要調(diào)用Take方法來取數(shù)據(jù)就可以了。
當然了上面的消費者代碼中還有一點是讓人不爽的,那就是 while 語句,可以更優(yōu)雅一點嗎?答案是,可以的。
Task.Factory.StartNew(() =>
{
foreach (string value in blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine("Worker A: " + value);
}
});
BlockingCollection.GetConsumingEnumerable 方法是關(guān)鍵,這個方法會遍歷集合取出數(shù)據(jù),一旦發(fā)現(xiàn)集合空了,則阻塞自己,直到集合中又有元素了再開始遍歷。
此時,完美了解決了生產(chǎn)者消費者問題。然而通常來說,還有下面兩個問題我們有時需要去控制
1 . 控制集合中數(shù)據(jù)的最大數(shù)量
這個問題由 BlockingCollection 構(gòu)造函數(shù)解決,構(gòu)造該對象實例的時候,構(gòu)造函數(shù)中的 BoundedCapacity 決定了集合最大的可容納數(shù)據(jù)數(shù)量,這個比較簡單。
2 . 何時停止的問題
這個問題由 CompleteAdding 和 IsCompleted 兩個配合解決。CompleteAdding 方法是直接不允許任何元素被加入集合;當使用了 CompleteAdding 方法后且集合內(nèi)沒有元素的時候,另一個屬性 IsCompleted 此時會為 True,這個屬性可以用來判斷是否當前集合內(nèi)的所有元素都被處理完。生產(chǎn)者修改后的代碼:
Task.Factory.StartNew(() =>
{
for (int count = 0; count < 10; count++)
{
blockingCollection.Add("msg" + count);
}
blockingCollection.CompleteAdding();
});
當使用了 CompleteAdding 方法后,對象停止往集合中添加數(shù)據(jù),這時如果是使用 GetConsumingEnumerable 枚舉的,那么這種枚舉會自然結(jié)束,不會再 Block 住集合,這種方式最優(yōu)雅,也是推薦的寫法。
但是如果是使用 TryTake 訪問元素的,則需要使用 IsCompleted 判斷一下,因為這個時候使用 TryTake 會拋InvalidOperationException 異常。接著我們看下最后的完整代碼:
static void Main(string[] args)
{
var blockingCollection = new BlockingCollection<string>();
var producer = Task.Factory.StartNew(() =>
{
for (int count = 0; count < 10; count++)
{
blockingCollection.Add("msg" + count);
Thread.Sleep(300);
}
blockingCollection.CompleteAdding();
});
var consumer1 = Task.Factory.StartNew(() =>
{
foreach (string value in blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine("Worker A: " + value);
}
});
var consumer2 = Task.Factory.StartNew(() =>
{
foreach (string value in blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine("Worker B: " + value);
}
});
Task.WaitAll(producer, consumer1, consumer2);
}
BlockingCollection 枚舉
此外,需要注意 BlockingCollection 有兩種枚舉方法,
1 . foreach
首先 BlockingCollection 本身繼承自IEnumerable,所以它自己就可以被 foreach 枚舉,首先 BlockingCollection 包裝了一個線程安全集合,那么它自己也是線程安全的,而當多個線程在同時修改或訪問線程安全容器時,BlockingCollection 自己作為 IEnumerable 會返回一個一定時間內(nèi)的集合片段,也就是只會枚舉在那個時間點上內(nèi)部集合的元素。使用這種方式枚舉的時候,不會有 Block 效果。
2 . GetConsumingEnumerable
另外一種方式就是我們上面使用的 GetConsumingEnumerable 方式的枚舉,這種方式會有 Block 效果,直到 CompleteAdding 被調(diào)用為止。
BlockingCollection 擴展
實現(xiàn) IProducerConsumerCollection 接口的幾個集合:ConcurrentBag (線程安全的無序的元素集合), ConcurrentStack (線程安全的堆棧) 和 ConcurrentQueue (線程安全的隊列)。這些都很簡單,功能與非線程安全的那些集合都一樣,只不過是多了 TryXXX 方法,多線程環(huán)境下使用這些方法就好了。
System.Collections.Concurrent
System.Collections.Concurrent 下面還有一些其他與多線程相關(guān)的集合,有些個類在原來的基礎上也添加了一下新的方法,例如:AddOrUpdate,GetOrAdd,TryXXX 等等,都很容易理解。
總結(jié)
本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
c#使用filesystemwatcher實時監(jiān)控文件目錄的添加和刪除
本文主要描述如何通過c#實現(xiàn)實時監(jiān)控文件目錄下的變化,包括文件和目錄的添加,刪除,修改和重命名等操作2014-01-01
C# Xamarin利用ZXing.Net.Mobile進行掃碼的方法
這篇文章主要介紹了C# Xamarin利用ZXing.Net.Mobile進行掃碼的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-06-06
C#實現(xiàn)Post數(shù)據(jù)或文件到指定的服務器進行接收
這篇文章主要為大家詳細介紹了如何通過C#實現(xiàn)Post數(shù)據(jù)或文件到指定的服務器進行接收,文中的示例代碼講解詳細,需要的小伙伴可以參考下2024-03-03

