2015-06-23 121 views
5

我有以下的設計TransformManyBlockTPL數據流塊消耗所有可用內存

  • 輸入:文件路徑
  • 輸出:將IEnumerable的文件的內容,一條線在同一時間

我在一個龐大的文件(61GB)上運行這個塊,這個文件太大而不適合RAM。爲了避免無限制的內存增長,我已經將BoundedCapacity設置爲該塊的非常低的值(例如1)以及所有的下游塊。儘管如此,該塊顯然重複IEnumerable貪婪,這消耗了計算機上的所有可用內存,使每個進程停止。該塊的OutputCount繼續無限制地上升,直到我終止進程。

我該怎麼做才能防止塊以這種方式使用IEnumerable

編輯:這裏是一個能說明問題的例子程序:

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

class Program 
{ 
    static IEnumerable<string> GetSequence(char c) 
    { 
     for (var i = 0; i < 1024 * 1024; ++i) 
      yield return new string(c, 1024 * 1024); 
    } 

    static void Main(string[] args) 
    { 
     var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }; 
     var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options); 
     var secondBlock = new ActionBlock<string>(str => 
      { 
       Console.WriteLine(str.Substring(0, 10)); 
       Thread.Sleep(1000); 
      }, options); 

     firstBlock.LinkTo(secondBlock); 
     firstBlock.Completion.ContinueWith(task => 
      { 
       if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception); 
       else secondBlock.Complete(); 
      }); 

     firstBlock.Post('A'); 
     firstBlock.Complete(); 
     for (; ;) 
     { 
      Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount); 
      Thread.Sleep(3000); 
     } 
    } 
} 

如果您使用的是64位中,確保清除在Visual Studio中的「首選32位」選項。我的計算機上有16GB的RAM,並且該程序立即消耗了所有可用的字節。

+0

以及TBH:我沒有時間在這裏與你爭辯 - 祝你好運 – Carsten

+0

好的,謝謝你的反饋。 – brianberns

+0

如果仔細閱讀了本節的其餘部分,您會發現它並不像您想象的那樣工作 - 您的firstBlock總是提供它可以生成的所有內容 - 如果您綁定了第二個,它將會拒絕第二個輸入並在稍後獲取它 – Carsten

回答

3

您似乎誤解了TPL Dataflow的工作原理。

BoundedCapacity限制您可以發佈到塊中的項目數量。在你的情況下,這意味着單個charTransformManyBlock和單個stringActionBlock

因此,您將單個項目張貼到TransformManyBlock,然後返回1024*1024字符串,並嘗試將它們傳遞給ActionBlock,該文件一次只能接受一個。其餘的字符串將只在TransformManyBlock的輸出隊列中。

什麼你可能想要做的是等待(同步或以其他方式)創建一個單獨的塊和後項目到它以流方式時,它達到容量:

private static void Main() 
{ 
    MainAsync().Wait(); 
} 

private static async Task MainAsync() 
{ 
    var block = new ActionBlock<string>(async item => 
    { 
     Console.WriteLine(item.Substring(0, 10)); 
     await Task.Delay(1000); 
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); 

    foreach (var item in GetSequence('A')) 
    { 
     await block.SendAsync(item); 
    } 

    block.Complete(); 
    await block.Completion; 
} 
+0

謝謝。我最終創建了一個封裝源ActionBlock和目標BufferBlock的新塊。您建議的操作塊使用SendAsync來填充緩衝區。對於外部世界來說,它的行爲就像一個具有我想要的行爲的TransformManyBlock。 – brianberns

+0

@brianberns:對不起,如果這是一個愚蠢的問題,但「await block.SendAsync(item)」和「block.Post(item)」之間有什麼區別? – Bugmaster

+0

@Bugmaster這根本不是一個愚蠢的問題:http://stackoverflow.com/a/13605979/885318 – i3arnon