2016-07-15 125 views
1

我正在從一個服務器端控制檯應用程序接收來自多個WCF服務的數據,做了一些工作,然後通過使用SignalR的單個連接轉發結果到IIS服務器。等待消費者使用BlockingCollection作爲隊列的異步方法

我試圖用生產者消費者模式實現這一點,其中WCF服務是生產者,使用SignalR發送數據的類是消費者。對於隊列我使用BlockingCollection

但是,當使用await/async發送消費者while循環中的數據時,直到所有其他線程完成將數據添加到隊列爲止。

出於測試目的,我用Task.Delay(1000).Wait();await Task.Delay(1000);替代了實際發送數據的代碼,兩者都被卡住。 一個簡單的Thread.Sleep(1000);似乎工作得很好,導致我認爲異步代碼是問題。

所以我的問題是:是否有東西阻止在while循環中完成異步代碼?我錯過了什麼?

我開始消費者線程這樣的:

new Thread(Worker).Start(); 

和消費者代碼:

private void Worker() 
{ 
    while (!_queue.IsCompleted) 
    { 
     IMobileMessage msg = null; 
     try 
     { 
      msg = _queue.Take(); 
     } 
     catch (InvalidOperationException) 
     { 
     } 

     if (msg != null) 
     { 
      try 
      { 
       Trace.TraceInformation("Sending: {0}", msg.Name); 
       Thread.Sleep(1000); // <-- works 
       //Task.Delay(1000).Wait(); // <-- doesn't work 
       msg.SentTime = DateTime.UtcNow; 
       Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime); 
      } 
      catch (Exception e) 
      { 
       TraceException(e); 
      } 
     } 
    } 
} 
+2

阻止和異步不是朋友。如果你將'async'與'BlockingCollection '混合在一起,你應該強烈考慮拋開'BlockingCollection '並看看TPL Dataflow。 'BufferBlock '是一個很好的起點,大約相當於'BlockingCollection ',但數據流還有更多可供生產者/消費者使用的場景。花點時間來了解它。這很值得。 – spender

+0

不錯,非常感謝,我會研究它。 –

回答

2

至於花錢正確地指出,BlockingCollection(顧名思義)僅用於與阻止代碼一起使用,並且對於異步代碼而言效果不佳。

有異步兼容的生產者/消費者隊列,如BufferBlock<T>。在這種情況下,我想ActionBlock<T>甚至會更好:

private ActionBlock<IMobileMsg> _block = new ActionBlock<IMobileMsg>(async msg => 
{ 
    try 
    { 
    Trace.TraceInformation("Sending: {0}", msg.Name); 
    await Task.Delay(1000); 
    msg.SentTime = DateTime.UtcNow; 
    Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime); 
    } 
    catch (Exception e) 
    { 
    TraceException(e); 
    } 
}); 

這取代你的整個消費線和主循環。

+0

我添加了您建議替換我的消費者和隊列的代碼,並且還可以在您的博客上閱讀關於TPL數據流的優秀帖子,它看起來像是有效,但我無法確認,因爲現在我的製作人計時器每次在2次滴答後停止。但我想這是新鮮事。我會盡快接受你的回答,我可以確認它的運作!謝謝! –

+0

出於某種原因,我似乎無法得到它的工作......我的生產者檢查是否有每秒從WCF服務的新消息(使用'Task.Delay(1000);'在循環中並將它們張貼到ActionBlock(如果有的話)根據我在動作塊的延遲和生產者延遲中使用的次數,一切正常,完全停止,或者只發送消息到塊緩衝區,但不再處理它們。在生產者中使用async while循環的動作塊? –

+0

@ J.Neijt:不,異步生產者工作正常。如果你的'TraceException'拋出一個異常,那將會阻止這個塊工作。 –