2016-03-02 47 views
6

我正在使用producer/consumer pattern實現數據鏈接層。數據鏈路層有自己的線程和狀態機,用於通過線路(以太網,RS-232 ...)傳輸數據鏈路協議。物理層的接口表示爲System.IO.Stream。另一個線程將消息寫入數據鏈接對象並從中讀取消息。C#等待生產者/消費者中的多個事件

數據鏈路對象具有空閒狀態必須等待的四個條件之一:

  1. 一個字節被接收
  2. 消息是可從網絡螺紋
  3. 保活定時器已過期
  4. 所有通信都被

我有困難的時候figu網絡層取消在沒有將通信拆分爲讀/寫線程的情況下(這會顯着增加複雜性),最好的方法是實現這一點。以下是我能得到3開出4:

// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token. 
stream.ReadTimeout = 10000; 
await stream.ReadAsync(buf, 0, 1, cts.Token); 

BlockingCollection<byte[]> SendQueue = new ...; 
... 
// Check for a message from network layer. Timeout after 10 seconds. 
// Monitor cancellation token. 
SendQueue.TryTake(out msg, 10000, cts.Token); 

我應該怎麼做才能阻止線程,等待所有四個條件?所有建議都歡迎。我沒有設置任何架構或數據結構。

編輯:********感謝大家的幫助。這是我的解決方案********

首先我不認爲有一個生產者/消費者隊列的異步實現。所以我實施了類似this stackoverflow post的東西。

我需要一個外部和內部的取消源來停止消費者線程並分別取消中間任務,similar to this article

byte[] buf = new byte[1]; 
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource()) 
{ 
    CancellationToken internalToken = internalTokenSource.Token; 
    CancellationToken stopToken = stopTokenSource.Token; 
    using (CancellationTokenSource linkedCts = 
     CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken)) 
    { 
     CancellationToken ct = linkedCts.Token; 
     Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct); 
     Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct); 
     Task keepAliveTask = Task.Delay(m_keepAliveTime, ct); 

     // Wait for at least one task to complete 
     await Task.WhenAny(readTask, msgTask, keepAliveTask); 

     // Next cancel the other tasks 
     internalTokenSource.Cancel(); 
     try { 
      await Task.WhenAll(readTask, msgTask, keepAliveTask); 
     } catch (OperationCanceledException e) { 
      if (e.CancellationToken == stopToken) 
       throw; 
     } 

     if (msgTask.IsCompleted) 
      // Send the network layer message 
     else if (readTask.IsCompleted) 
      // Process the byte from the physical layer 
     else 
      Contract.Assert(keepAliveTask.IsCompleted); 
      // Send a keep alive message 
    } 
} 
+3

['await Task.WhenAny(...)'](https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.whenany%28v=vs.110%29.aspx)可能有幫助。 –

回答

2

在這種情況下,我只會用取消標記註銷。像保活計時器一樣的重複超時可以更好地表示爲計時器。

所以,我會把它建模爲三個可取消任務。首先,將取消標記:

的所有通信是由網絡層取消

CancellationToken token = ...; 

然後,三個併發操作:

一個字節被接收

var readByteTask = stream.ReadAsync(buf, 0, 1, token); 

保活定時器超時

var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); 

消息是從網線

這一個是有點棘手。您當前的代碼使用BlockingCollection<T>,它不是異步兼容的。我建議切換到TPL Dataflow's BufferBlock<T>my own AsyncProducerConsumerQueue<T>,它們中的任何一個都可以用作異步兼容的生產者/消費者隊列(意味着生產者可以是同步或異步的,消費者可以是同步的或異步的)。

BufferBlock<byte[]> SendQueue = new ...; 
... 
var messageTask = SendQueue.ReceiveAsync(token); 

然後你可以使用Task.WhenAny,以確定這些任務的完成:

var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask); 

現在,你可以通過比較completedTask別人和await檢索結果荷蘭國際集團他們:

if (completedTask == readByteTask) 
{ 
    // Throw an exception if there was a read error or cancellation. 
    await readByteTask; 
    var byte = buf[0]; 
    ... 
    // Continue reading 
    readByteTask = stream.ReadAsync(buf, 0, 1, token); 
} 
else if (completedTask == keepAliveTimerTask) 
{ 
    // Throw an exception if there was a cancellation. 
    await keepAliveTimerTask; 
    ... 
    // Restart keepalive timer. 
    keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); 
} 
else if (completedTask == messageTask) 
{ 
    // Throw an exception if there was a cancellation (or the SendQueue was marked as completed) 
    byte[] message = await messageTask; 
    ... 
    // Continue reading 
    messageTask = SendQueue.ReceiveAsync(token); 
} 
+0

希望我之前看到過這個,因爲我去了並實現了自己的AsyncQueue,雖然不像您的一般。你爲什麼要等待每個人的任務?它不應該完成嗎? –

+0

@BrianHeilig:有必要檢索結果。這些任務已經完成,但他們的結果沒有被觀察到。結果可以是數據,例如'messageTask',但結果也可以是例外。 'keepAliveTimerTask'可以有一個異常,如果它被取消,'readByteTask'可以有一個異常,如果它被取消或者有一些I/O讀取錯誤。如果存在,「等待」將(重新)提出這些例外。 –

1

取消讀取不會讓您知道數據是否被讀取。取消和閱讀不是相對於原子的。只有在取消後關閉流,該方法纔有效。

隊列方法更好。您可以創建鏈接的CancellationTokenSource,該鏈接在您需要時即被取消。而不是通過cts.Token你傳遞一個你控制的令牌。

然後,您可以根據時間,另一個標記以及您喜歡的任何其他事件發出該標記的信號。如果使用內置超時,則隊列將在內部執行相同的操作,以將傳入的令牌與超時鏈接起來。

+0

對不起,我的細節有點不足。 cts是由數據鏈路層擁有的CancellationTokenSource。取消也由數據鏈路層控制;網絡線程調用停止,取消所有通信並等待線程完成。我們的目標是平穩而快速地停止數據鏈路層。 –

+1

爲什麼這個答案不起作用?你可以在任何你喜歡的條件下使TryTake中止。 – usr

+0

我想我明白了。你建議我在網絡線程取消或一個字節可用時發出取消令牌的信號,對嗎?我想我需要另一個線程同步從流中讀取,然後在讀取完成時發出取消標記。我需要一個變量來確定什麼被取消(讀取或停止)。我還需要在取消消息後取消讀取線程,對嗎?或者我錯過了什麼? –

3

我會與您的選擇兩個,等待任何4種情況發生。假設你已經有4個任務已經awaitable方法:

var task1 = WaitForByteReceivedAsync(); 
var task2 = WaitForMessageAvailableAsync(); 
var task3 = WaitForKeepAliveTimerAsync(); 
var task4 = WaitForCommunicationCancelledAsync(); 

// now gather them 
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{ 
task1, task2, task3, task4 
}; 

// Wait for any of the things to complete 
var result = await Task.WhenAny(theTasks); 

上面的代碼將立即恢復第一個任務完成後,而忽視了其他3。

注:

the documentation for WhenAny,它說:

返回的任務將始終與它的結果的RanToCompletion狀態設置爲第一任務完成結束。即使完成的第一項任務以「已取消」或「已故」狀態結束,也是如此。

相信發生了什麼事之前,所以一定要做到這一點最後的檢查:

if(result.Result.Result == true) 
... // First Result is the task, the second is the bool that the task returns 
+1

我認爲你的意思是WhenAny() –

+1

不。WhenAny返回任何任務返回的時刻。當所有的任務都等待完成,並返回一個任務本身(而不是WaitAll() –

+1

這是行不通的,因爲字節讀取的結果可能會被丟棄,我認爲他關心的數據不會丟失。 – usr