我正在使用producer/consumer pattern實現數據鏈接層。數據鏈路層有自己的線程和狀態機,用於通過線路(以太網,RS-232 ...)傳輸數據鏈路協議。物理層的接口表示爲System.IO.Stream。另一個線程將消息寫入數據鏈接對象並從中讀取消息。C#等待生產者/消費者中的多個事件
數據鏈路對象具有空閒狀態必須等待的四個條件之一:
- 一個字節被接收
- 消息是可從網絡螺紋
- 保活定時器已過期
- 所有通信都被
我有困難的時候figu網絡層取消在沒有將通信拆分爲讀/寫線程的情況下(這會顯着增加複雜性),最好的方法是實現這一點。以下是我能得到3開出4:
// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);
或
BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);
我應該怎麼做才能阻止線程,等待所有四個條件?所有建議都歡迎。我沒有設置任何架構或數據結構。
編輯:********感謝大家的幫助。這是我的解決方案********
首先我不認爲有一個生產者/消費者隊列的異步實現。所以我實施了類似this stackoverflow post的東西。
我需要一個外部和內部的取消源來停止消費者線程並分別取消中間任務,similar to this article。
byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
CancellationToken internalToken = internalTokenSource.Token;
CancellationToken stopToken = stopTokenSource.Token;
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken))
{
CancellationToken ct = linkedCts.Token;
Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);
Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);
// Wait for at least one task to complete
await Task.WhenAny(readTask, msgTask, keepAliveTask);
// Next cancel the other tasks
internalTokenSource.Cancel();
try {
await Task.WhenAll(readTask, msgTask, keepAliveTask);
} catch (OperationCanceledException e) {
if (e.CancellationToken == stopToken)
throw;
}
if (msgTask.IsCompleted)
// Send the network layer message
else if (readTask.IsCompleted)
// Process the byte from the physical layer
else
Contract.Assert(keepAliveTask.IsCompleted);
// Send a keep alive message
}
}
['await Task.WhenAny(...)'](https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.whenany%28v=vs.110%29.aspx)可能有幫助。 –