C#使用channel實(shí)現(xiàn)Plc異步任務(wù)之間的通信
channel 通信的例子:
using ConsoleApp2;
using System.Collections.Concurrent;
using System.Threading.Channels;
var queue = new BlockingCollection<Message>(new ConcurrentQueue<Message>());
var opt = new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true,
Capacity = 100 //最大容量
};
//有限的
var channelTest = Channel.CreateBounded<Message>(opt);
//無(wú)限的
var channel = Channel.CreateUnbounded<Message>();
var sender1 = SendMessageThreadAsync(channel.Writer, 1);
var sender2 = SendMessageThreadAsync(channel.Writer, 2);
var receiver1 = ReceiveMessageThreadAsync(channel.Reader, 3);
var receiver2 = ReceiveMessageThreadAsync(channel.Reader, 4);
//await sender;
// make sure all messages are received
await Task.WhenAll(sender1, sender2);
channel.Writer.Complete();
await Task.WhenAll(receiver1, receiver2);
//await receiver;
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
async Task SendMessageThreadAsync(ChannelWriter<Message> writer, int id)
{
for (int i = 0; i < 20; i++)
{
await writer.WriteAsync(new Message(id, i.ToString()));
Console.WriteLine($"Thread {id} sent {i}");
await Task.Delay(100);
}
}
async Task ReceiveMessageThreadAsync(ChannelReader<Message> reader, int id)
{
//try
//{
// while (!reader.Completion.IsCompleted)
// {
// var message = await reader.ReadAsync();
// Console.WriteLine($"Thread {id} received {message.Content}");
// }
//}
//catch (Exception ex)
//{
// Console.WriteLine($"Thread {id} channel closed:{ex.Message}");
//}
await foreach (var message in reader.ReadAllAsync())
{
Console.WriteLine($"Thread {id} received {message.Content}");
}
}
record Message(int FromId, string Content);
改造為Plc的實(shí)例
record PlcDataMessage
{
public bool IsConnected { get; init; }
public DbData DbData { get; init; }
// 可以添加其他需要傳遞的信息
}
// 創(chuàng)建一個(gè)無(wú)邊界的Channel來(lái)發(fā)送和接收消息
var plcDataChannel = Channel.CreateUnbounded<PlcDataMessage>();
// 啟動(dòng)一個(gè)新的任務(wù)來(lái)模擬PLC數(shù)據(jù)讀取
Task.Factory.StartNew(async () =>
{
var cts = new CancellationTokenSource(); // 假設(shè)您已經(jīng)有了取消令牌源
while (!cts.IsCancellationRequested)
{
try
{
// ... 省略了連接PLC的代碼,這部分邏輯保持不變 ...
if (MyIsConnected)
{
DbData dbDataTemp = await s7Plc.ReadClassAsync<DbData>(42, 0);
// 心跳和其他操作...
// 構(gòu)造消息并發(fā)送到Channel
var message = new PlcDataMessage
{
IsConnected = MyIsConnected,
DbData = dbDataTemp
};
await plcDataChannel.Writer.WriteAsync(message, cts.Token);
}
// ... 其他邏輯保持不變 ...
}
catch (Exception ex)
{
// 處理異常并重新連接PLC(如果需要)
// ...
// 可以通過(guò)Channel發(fā)送一個(gè)特殊的消息來(lái)表示連接已斷開(kāi)或發(fā)生了錯(cuò)誤
// 這里省略了這部分邏輯
// 休眠一段時(shí)間后再重試
await Task.Delay(2000, cts.Token);
}
}
// 完成后通知Channel不再發(fā)送更多數(shù)據(jù)
plcDataChannel.Writer.Complete();
}, cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
// 在另一個(gè)任務(wù)或線程中讀取Channel中的數(shù)據(jù)
Task.Run(async () =>
{
await foreach (var message in plcDataChannel.Reader.ReadAllAsync(cts.Token))
{
if (message.IsConnected)
{
lock (lockObj)
{
// 更新dbData,這里假設(shè)dbData是一個(gè)線程安全的對(duì)象或結(jié)構(gòu)
dbData.Str_S = message.DbData.Str_S.Trim();
// ... 更新其他屬性 ...
}
// 處理讀取到的數(shù)據(jù)...
}
else
{
// 處理PLC斷開(kāi)連接的情況...
}
}
// 讀取完成,Channel已關(guān)閉
Console.WriteLine("PLC數(shù)據(jù)讀取完畢。");
}, cts.Token);
// ... 其他代碼,如等待所有任務(wù)完成、處理取消邏輯等 ...
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
// ... 其他必要的引用和類(lèi)型定義 ...
// 創(chuàng)建一個(gè)無(wú)邊界的Channel來(lái)發(fā)送和接收消息
var plcDataChannel = Channel.CreateUnbounded<PlcDataMessage>();
// 取消令牌源
var cts = new CancellationTokenSource();
// 啟動(dòng)一個(gè)新的任務(wù)來(lái)模擬PLC數(shù)據(jù)讀取
Task.Run(async () =>
{
Plc s7Plc = null;
bool MyIsConnected = false;
int errorTimes = 0;
try
{
while (!cts.IsCancellationRequested)
{
if (s7Plc == null || !MyIsConnected)
{
// 嘗試連接PLC(略去具體實(shí)現(xiàn))
// ...
if (MyIsConnected)
{
// 連接成功,發(fā)送連接成功消息(如果需要)
// ...
}
}
else
{
try
{
// 讀取PLC數(shù)據(jù)(略去具體實(shí)現(xiàn))
DbData dbDataTemp = await s7Plc.ReadClassAsync<DbData>(42, 0, cts.Token);
// 心跳和其他操作...
// 構(gòu)造消息并發(fā)送到Channel
var message = new PlcDataMessage { IsConnected = MyIsConnected, DbData = dbDataTemp };
await plcDataChannel.Writer.WriteAsync(message, cts.Token);
errorTimes = 0; // 重置錯(cuò)誤計(jì)數(shù)器
}
catch (Exception ex)
{
errorTimes++;
// 處理異常(例如記錄日志)
// ...
// 在達(dá)到一定錯(cuò)誤次數(shù)后,關(guān)閉PLC連接并重置
if (errorTimes > someThreshold)
{
s7Plc?.Close();
s7Plc = null;
MyIsConnected = false;
// 可以選擇發(fā)送一個(gè)斷開(kāi)連接的消息到Channel
}
// 休眠一段時(shí)間后再重試
await Task.Delay(2000, cts.Token);
}
}
// 可以添加一些延時(shí)來(lái)減少循環(huán)的頻率
await Task.Delay(somePollingInterval, cts.Token);
}
}
catch (OperationCanceledException)
{
// 取消是預(yù)期的,不需要額外處理
}
finally
{
// 確保關(guān)閉PLC連接和Channel寫(xiě)入器
s7Plc?.Close();
plcDataChannel.Writer.Complete();
}
}, cts.Token);
// 在另一個(gè)任務(wù)或線程中讀取Channel中的數(shù)據(jù)
Task.Run(async () =>
{
await foreach (var message in plcDataChannel.Reader.ReadAllAsync(cts.Token))
{
if (message.IsConnected)
{
// 更新dbData(這里假設(shè)dbData是一個(gè)線程安全的對(duì)象或結(jié)構(gòu))
// 根據(jù)需要添加適當(dāng)?shù)耐綑C(jī)制
// ...
// 處理讀取到的數(shù)據(jù)...
}
else
{
// 處理PLC斷開(kāi)連接的情況...
}
}
// 讀取完成,Channel已關(guān)閉
Console.WriteLine("PLC數(shù)據(jù)讀取完畢。");
}, cts.Token);
// ... 其他代碼,如等待所有任務(wù)完成、處理取消邏輯等 ...
// 在某個(gè)適當(dāng)?shù)臅r(shí)刻取消任務(wù)
// cts.Cancel();
// 等待所有任務(wù)完成(如果需要
拓展:C# Channel實(shí)現(xiàn)線程間通信
C# Channel實(shí)現(xiàn)線程間通信
同步方式實(shí)現(xiàn):
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace ConsoleApp1
{
public class ChannelDemo
{
static Channel<Message> channel1 = Channel.CreateUnbounded<Message>();
public static void Main2()
{
sender.Start(1);
receive1.Start(2);
receive2.Start(3);
sender.Join();
Thread.Sleep(3000);
receive1.Interrupt();
receive2.Interrupt();
receive1.Join();
receive2.Join();
Console.ReadKey();
}
static Thread sender = new Thread(SendMsg);
static Thread receive1 = new Thread(ReceiveMsg);
static Thread receive2 = new Thread(ReceiveMsg);
static void SendMsg(object id)
{
for (int i = 0; i < 20; i++)
{
if (channel1.Writer.TryWrite(new Message((int)id, i.ToString())))
{
Console.WriteLine($"【線程{id}】發(fā)送了【{i}】");
}
}
}
static void ReceiveMsg(object id)
{
try
{
while (true)
{
if (channel1.Reader.TryRead(out Message message))
{
Console.WriteLine($"【線程{id}】從【線程{message.id}】接收了【{message.content}】");
}
Thread.Sleep(1);
}
}
catch (ThreadInterruptedException ex)
{
Console.WriteLine($"接收結(jié)束");
}
}
}
}
異步方式:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Remoting.Channels;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace ConsoleApp1
{
public class ChannelDemo2
{
static Channel<Message> channel1 = Channel.CreateUnbounded<Message>();
public static async void Main2()
{
await Task.WhenAll(sender, sender2);
channel1.Writer.Complete();
await Task.WhenAll(receive1, receive2);
Console.ReadKey();
}
static Task sender = SendMsgAsync(channel1.Writer, 1);
static Task sender2 = SendMsgAsync(channel1.Writer, 4);
static Task receive1 = ReceiveMsgAsync(channel1.Reader, 2);
static Task receive2 = ReceiveMsgAsync(channel1.Reader, 3);
static async Task SendMsgAsync(ChannelWriter<Message> writer, int id)
{
for (int i = 0; i < 20; i++)
{
await writer.WriteAsync(new Message((int)id, i.ToString()));
Console.WriteLine($"【線程{id}】發(fā)送了【{i}】");
}
}
static async Task ReceiveMsgAsync(ChannelReader<Message> reader,int id)
{
try
{
while (!reader.Completion.IsCompleted)
{
Message message = await reader.ReadAsync();
Console.WriteLine($"【線程{id}】從【線程{message.id}】接收了【{message.content}】");
}
}
catch (ChannelClosedException ex)
{
Console.WriteLine($"ChannelClosed 接收結(jié)束");
}
}
}
}
在對(duì)Channel進(jìn)行實(shí)例化的時(shí)候,也可以傳遞一個(gè)Options,這里面可以對(duì)消息容量,是否多個(gè)發(fā)送者和接受者進(jìn)行定義。
以上就是C#使用channel實(shí)現(xiàn)Plc異步任務(wù)之間的通信的詳細(xì)內(nèi)容,更多關(guān)于C# channel Plc異步通信的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
C#?WinForm?RichTextBox文本動(dòng)態(tài)滾動(dòng)顯示文本方式
這篇文章主要介紹了C#?WinForm?RichTextBox文本動(dòng)態(tài)滾動(dòng)顯示文本方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03
C#使用System.Net郵件發(fā)送功能踩過(guò)的坑
這篇文章主要介紹了C#使用System.Net郵件發(fā)送功能踩過(guò)的坑,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
C#使用ZBar實(shí)現(xiàn)識(shí)別條形碼
目前主流的識(shí)別庫(kù)主要有ZXing.NET和ZBar,本文主要介紹的是如何使用ZBar庫(kù)實(shí)現(xiàn)識(shí)別條形碼功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下2023-07-07
c#中SAPI使用總結(jié)——SpVoice的使用方法
最近使用C#重做了點(diǎn)名系統(tǒng)(要用到TTS,讓計(jì)算機(jī)點(diǎn)名)使用了SAPI,在這里總結(jié)一下SpVoice的使用方法。2011-10-10
C#實(shí)現(xiàn)提取Word中插入的多媒體文件(視頻,音頻)
在Word中可將文件通過(guò)OLE對(duì)象嵌入的方式插入到文檔,包括Word、excel、PDF、PPT、圖片、宏文件、文件包等在內(nèi)的多種文件類(lèi)型。本文將利用C#實(shí)現(xiàn)提取插入在Word文件中的這些多媒體文件,感興趣的可以了解一下2022-02-02
C# WinForm開(kāi)發(fā)中使用XML配置文件實(shí)例
這篇文章主要介紹了C# WinForm開(kāi)發(fā)中使用XML配置文件實(shí)例,本文詳細(xì)講解了如何使用一個(gè)XML文件作為WinForm的配置文件,需要的朋友可以參考下2014-08-08

