C# BlockingCollection的使用小結(jié)
什么是BlockingCollection<T>
BlockingCollection<T> 是一個(gè)線程安全的集合,它提供了一種機(jī)制,允許一個(gè)或多個(gè)生產(chǎn)者線程將數(shù)據(jù)添加到集合中,同時(shí)允許一個(gè)或多個(gè)消費(fèi)者線程從集合中取出數(shù)據(jù)。它內(nèi)部封裝了一個(gè)線程安全的集合(如 ConcurrentQueue<T>、ConcurrentStack<T> 或 ConcurrentBag<T>),并提供了阻塞和限制集合大小的功能。
主要特點(diǎn)
- 線程安全:內(nèi)部使用鎖或其他同步機(jī)制,確保在多線程環(huán)境下對集合的操作是安全的。
- 阻塞操作:當(dāng)集合為空時(shí),消費(fèi)者線程會阻塞等待,直到有數(shù)據(jù)可用;當(dāng)集合達(dá)到最大容量時(shí),生產(chǎn)者線程會阻塞等待,直到有空間可用。
- 限制大?。嚎梢酝ㄟ^構(gòu)造函數(shù)指定集合的最大容量。
- 支持多種底層集合:可以使用
ConcurrentQueue<T>(默認(rèn))、ConcurrentStack<T>或ConcurrentBag<T>作為底層存儲結(jié)構(gòu)。
構(gòu)造函數(shù)
BlockingCollection<T> 提供了多種構(gòu)造方式:
// 使用默認(rèn)的 ConcurrentQueue<T>,無容量限制 var blockingCollection = new BlockingCollection<int>(); // 使用默認(rèn)的 ConcurrentQueue<T>,并指定最大容量 var blockingCollection = new BlockingCollection<int>(10); // 指定底層集合類型 var blockingCollection = new BlockingCollection<int>(new ConcurrentStack<int>());
常用方法
生產(chǎn)者操作
Add(T item):將一個(gè)元素添加到集合中。如果集合已滿,會拋出異常。TryAdd(T item):嘗試將一個(gè)元素添加到集合中。如果集合已滿,返回false。TryAdd(T item, TimeSpan timeout):嘗試在指定的超時(shí)時(shí)間內(nèi)將元素添加到集合中。CompleteAdding():標(biāo)記集合不再添加新的元素。消費(fèi)者線程在集合為空時(shí)會收到通知并退出。
消費(fèi)者操作
Take():從集合中取出一個(gè)元素。如果集合為空,線程會阻塞等待。TryTake(out T item):嘗試從集合中取出一個(gè)元素。如果集合為空,返回false。TryTake(out T item, TimeSpan timeout):嘗試在指定的超時(shí)時(shí)間內(nèi)從集合中取出一個(gè)元素。GetConsumingEnumerable():返回一個(gè)可枚舉的集合,消費(fèi)者可以使用foreach遍歷集合中的元素。當(dāng)調(diào)用CompleteAdding()后,枚舉會結(jié)束。
示例代碼
以下是一個(gè)簡單的生產(chǎn)者-消費(fèi)者示例,使用 BlockingCollection<T> 實(shí)現(xiàn):
using System;
using System.Collections.Concurrent;
using System.Threading;
class Program
{
static void Main()
{
// 創(chuàng)建一個(gè)容量為 5 的 BlockingCollection
var blockingCollection = new BlockingCollection<int>(5);
// 啟動(dòng)生產(chǎn)者線程
Thread producerThread = new Thread(() =>
{
for (int i = 1; i <= 10; i++)
{
blockingCollection.Add(i); // 添加元素
Console.WriteLine($"Producer added: {i}");
Thread.Sleep(500); // 模擬生產(chǎn)時(shí)間
}
blockingCollection.CompleteAdding(); // 標(biāo)記不再添加元素
});
// 啟動(dòng)消費(fèi)者線程
Thread consumerThread = new Thread(() =>
{
foreach (var item in blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine($"Consumer consumed: {item}");
Thread.Sleep(1000); // 模擬消費(fèi)時(shí)間
}
});
producerThread.Start();
consumerThread.Start();
producerThread.Join();
consumerThread.Join();
}
}
輸出示例
Producer added: 1
Producer added: 2
Consumer consumed: 1
Producer added: 3
Consumer consumed: 2
Producer added: 4
Consumer consumed: 3
Producer added: 5
Consumer consumed: 4
Producer added: 6
Consumer consumed: 5
Producer added: 7
Consumer consumed: 6
Producer added: 8
Consumer consumed: 7
Producer added: 9
Consumer consumed: 8
Producer added: 10
Consumer consumed: 9
Consumer consumed: 10
注意事項(xiàng)
- 線程安全:
BlockingCollection<T>是線程安全的,但需要確保對集合的操作不會與其他非線程安全的操作混用。 - 容量限制:如果集合滿了,生產(chǎn)者線程會阻塞,因此需要合理設(shè)置容量。
- 異常處理:在生產(chǎn)者調(diào)用
Add()或消費(fèi)者調(diào)用Take()時(shí),可能會拋出異常(如集合已滿或已標(biāo)記為完成添加)。建議使用TryAdd()和TryTake()方法來避免異常。BlockingCollection<T>是C#中實(shí)現(xiàn)線程安全的生產(chǎn)者-消費(fèi)者模式的利器,它簡化了線程同步的復(fù)雜性,非常適合多線程編程場景。
串口接收
在使用 BlockingCollection<T> 存儲串口接收的數(shù)據(jù),并在其他線程中取出時(shí),是否能保證數(shù)據(jù)的順序,主要取決于以下兩個(gè)因素:
底層存儲的類型
BlockingCollection<T> 允許指定底層存儲的類型。默認(rèn)情況下,它使用 ConcurrentQueue<T> 作為底層存儲,而 ConcurrentQueue<T> 是一個(gè)先進(jìn)先出 FIFO的隊(duì)列。這意味著數(shù)據(jù)的添加順序和取出順序是一致的,因此可以保證順序。
如果你使用其他類型的底層存儲(如 ConcurrentStack<T> 或自定義的線程安全集合),則順序可能會有所不同。例如:
ConcurrentQueue<T>:保證 FIFO 順序。ConcurrentStack<T>:保證 LIFO(后進(jìn)先出)順序。
線程安全和并發(fā)訪問
BlockingCollection<T> 是線程安全的,因此即使在多線程環(huán)境下,數(shù)據(jù)的添加和取出操作也是安全的。只要底層存儲是 FIFO 的(如 ConcurrentQueue<T>),數(shù)據(jù)的順序就能得到保證。
串口數(shù)據(jù)接收的順序性
串口通信本身是按字節(jié)順序接收數(shù)據(jù)的,因此只要數(shù)據(jù)是逐字節(jié)接收并立即添加到 BlockingCollection<T> 中,數(shù)據(jù)的順序就能得到保證。
示例代碼
以下是一個(gè)示例,展示如何使用 BlockingCollection<T> 存儲串口接收的數(shù)據(jù),并在其他線程中按順序取出:
using System;
using System.Collections.Concurrent;
using System.IO.Ports;
using System.Threading;
class SerialPortExample
{
private SerialPort _serialPort;
private BlockingCollection<string> _dataQueue = new BlockingCollection<string>();
public SerialPortExample(string portName)
{
_serialPort = new SerialPort(portName)
{
BaudRate = 9600,
DataBits = 8,
Parity = Parity.None,
StopBits = StopBits.One,
ReadTimeout = 500
};
_serialPort.DataReceived += SerialPort_DataReceived;
}
private void SerialPort_DataReceived(object sender, SerialDataReceivedEventArgs e)
{
try
{
string data = _serialPort.ReadLine(); // 假設(shè)數(shù)據(jù)以換行符分隔
_dataQueue.Add(data); // 將數(shù)據(jù)添加到阻塞集合
Console.WriteLine($"Received and added: {data}");
}
catch (Exception ex)
{
Console.WriteLine($"Error in DataReceived: {ex.Message}");
}
}
public void Start()
{
_serialPort.Open();
Thread consumerThread = new Thread(ConsumeData);
consumerThread.Start();
}
private void ConsumeData()
{
foreach (var data in _dataQueue.GetConsumingEnumerable())
{
Console.WriteLine($"Consumed: {data}");
// 處理數(shù)據(jù)
}
}
public void Stop()
{
_dataQueue.CompleteAdding();
_serialPort.Close();
}
static void Main()
{
SerialPortExample example = new SerialPortExample("COM3");
example.Start();
Console.WriteLine("Press Enter to exit...");
Console.ReadLine();
example.Stop();
}
}
關(guān)鍵點(diǎn)
- 底層存儲:使用
ConcurrentQueue<T>(默認(rèn))可以保證數(shù)據(jù)的 FIFO 順序。 - 線程安全:
BlockingCollection<T>是線程安全的,因此在多線程環(huán)境下不會出現(xiàn)數(shù)據(jù)順序混亂的問題。 - 串口數(shù)據(jù)接收:只要串口接收的數(shù)據(jù)是按順序添加到
BlockingCollection<T>中的,順序就能得到保證。
因此,只要使用默認(rèn)的 ConcurrentQueue<T> 作為底層存儲,并且正確處理串口數(shù)據(jù)的接收和添加,BlockingCollection<T> 是可以保證數(shù)據(jù)順序的。
到此這篇關(guān)于C# BlockingCollection的使用小結(jié)的文章就介紹到這了,更多相關(guān)c# blockingcollection內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于WPF實(shí)現(xiàn)PDF的顯示與轉(zhuǎn)換
這篇文章為大家詳細(xì)主要介紹了如何基于WPF實(shí)現(xiàn)PDF的顯示并轉(zhuǎn)換成圖片,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-12-12
C#使用正則表達(dá)式實(shí)現(xiàn)漢字轉(zhuǎn)拼音
這篇文章主要為大家詳細(xì)介紹了C#如何使用正則表達(dá)式實(shí)現(xiàn)漢字轉(zhuǎn)拼音的功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-01-01

