2016-10-31 61 views
2

我正在試驗TPL Dataflow,然後將其移植到我的生產代碼中。 生產代碼是一個經典的生產者/消費者系統 - 生產者生產消息(與金融領域相關),消費者處理這些消息。TPL Dataflow - 生產速度非常快,消費者速度不是很快OutOfMemory異常

我感興趣的是,如果在某一點上生產者產生的速度遠遠超過消費者能夠處理它的速度(系統將會爆炸,或者會發生什麼),環境將會保持多麼穩定&更重要的是什麼在這些情況下做。

所以試圖有類似的簡單的應用程序,我想出了以下內容。

var bufferBlock = new BufferBlock<Item>(); 

    var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions 
         { 
          MaxDegreeOfParallelism = Environment.ProcessorCount 
          , 
          BoundedCapacity = 100000 
         }; 

     var dataFlowLinkOptions = new DataflowLinkOptions 
         { 
          PropagateCompletion = true 
         }; 

     var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t), 
executiondataflowBlockOptions); 

      bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions); 
      for (int i = 0; i < int.MaxValue; i++) 
      { 
       bufferBlock.SendAsync(GenerateItem()); 
      } 

      bufferBlock.Complete(); 
      Console.ReadLine(); 

Item是一個非常簡單的類

internal class Item 
    { 
     public Item(string itemId) 
     { 
      ItemId = itemId; 
     } 

     public string ItemId { get; } 
    } 

GenerateItem只是新聞了Item

static Item GenerateItem() 
{ 
    return new Item(Guid.NewGuid().ToString()); 
} 

現在,模仿沒有那麼快消費者 - 我做了ProcessItem舉行的100ms

static async Task ProcessItem(Item item) 
{ 
    await Task.Delay(TimeSpan.FromMilliseconds(100)); 
    Console.WriteLine($"Processing #{item.ItemId} item."); 
} 

執行此操作會在20秒內產生OOM異常。

然後,我繼續添加更多消費者(更多ActionBlocks達10個),這會贏得更多時間,但最終會導致相同的OOM異常。我也注意到GC是在巨大的壓力下(VS 2015診斷工具顯示GC幾乎是在所有時間運行),所以我介紹了對象池(很簡單的一個,本質上它是ConcurrentBag存儲項)爲Item,但仍然我正在碰壁(OOM異常被拋出)。

要詳細說明內存中的內容,爲什麼內存不足。

  • 最大尺寸有類型的對象SingleProducerSingleConsumerQueue+Segment<TplDataFlow.Item> & ConcurrentQueue+Segment<TplDataFlow.Item>
  • 我看到的BufferBlock INPUTBUFFER充滿Item S(計數= 14562296)
  • 由於我設置BoundedCapacityActionBlock(S),其輸入緩衝區也接近可配置的數字(InputCount = 99,996)

爲了確保較慢的生產者能夠使消費者有可能跟上,我做製片迭代之間睡覺:

for (int i = 0; i < int.MaxValue; i++) 
{ 
    Thread.Sleep(TimeSpan.FromMilliseconds(50)); 
    bufferBlock.SendAsync(GenerateItem()); 
} 

,它工作正常 - 不會拋出異常,內存使用率正在不斷低,我看不出有任何GC的壓力了。

所以,我有幾個問題

  1. 我做任何事情本身錯了,而試圖複製速度非常快生產者/慢速消費者(S)的情況與TPL數據流積木
  2. 有什麼辦法使這項工作,而不是與OOM異常失敗。
  3. 關於最佳實踐的任何意見/鏈接如何在TPL數據流上下文中處理這種場景(非常快速的生產者/緩慢的消費者)。
  4. 我對這個問題的理解是 - 因爲消費者無法跟上,所以內部緩衝區充滿了非常快的消息,並且持續到消息,直到一些消費者回來要求下一條消息作爲結果應用程序內存不足(由於填充了內部緩衝區BufferBlock) - 您是否同意這一點?

我正在使用Microsoft.Tpl.Dataflow package -version 4.5.24。 .NET 4.5(C#6)。進程是32位。

回答

5

您已經很好地發現了問題:BufferBlock正在填充其輸入緩衝區,直到它遇到OOM。

要解決這個問題,您還應該在緩衝塊中添加一個BoundedCapacity選項。這將自動爲您節制製作人(您的製作人無需使用Thread.Sleep)。

+0

感謝您的確認!你會評論BufferBlock的BoundedCapacity行爲嗎?特別是一旦內部緩衝區已滿,傳入消息會發生什麼情況?他們會被放棄嗎? (現在看來是這樣)。你是否可以在你的書中提到更多細節? – Michael

+0

@Michael:郵件永遠不會丟失。如果你將一個消息發佈到一個完整的塊,它將返回'false'(表示消息未被接受)。如果您將「SendAsync」消息等待到完整的塊,它將(異步)等待有空間然後發佈消息。看[這個答案](http://stackoverflow.com/a/13605979/263693)。 –

+0

謝謝@StephenCleary! – Michael

相關問題