2017-04-20 40 views
0

我正在使用一個內存飢渴的應用程序,它使Task的使用能夠並行處理。問題是它產生了大量的內存,然後掛在它上面,使我的16GByte系統超載,直到GC運行。在這一點上,它的表現非常糟糕,可能需要幾天時間才能完成。原始應用程序通常需要30分鐘才能運行。這是這一個精簡版:內存飢餓的多線程應用程序

class Program 
{ 
    static void Main(string[] args) 
    { 
     var tasks = new List<Task<string[]>>(); 
     var report = new List<string>(); 

     for (int i = 0; i < 2000; i++) 
     { 
      tasks.Add(Task<string[]>.Factory.StartNew(DummyProcess.Process)); 
     } 

     foreach (var task in tasks) 
     { 
      report.AddRange(task.Result); 
     } 

     Console.WriteLine("Press RETURN..."); 
     Console.ReadLine(); 
    } 
} 

這裏的「處理器」:

public static class DummyProcess 
{ 
    public static string[] Process() 
    { 
     var result = new List<string>(); 

     for (int i = 1; i < 10000000; i++) 
     { 
      result.Add($"This is a dummy string of some length [{i}]"); 
     } 

     var random = new Random(); 
     var delay = random.Next(100, 300); 

     Thread.Sleep(delay); 

     return result.ToArray(); 
    } 
} 

我相信這個問題是在這裏:

foreach (var task in tasks) 
{ 
    report.AddRange(task.Result); 
} 

任務沒有得到處置當他們完成時 - 獲取結果(字符串[])離開任務然後處置任務的最佳方式是什麼?

我也試試這個:

foreach (var task in tasks) 
{ 
    report.AddRange(task.Result); 
    task.Dispose(); 
} 

但是沒有太大的區別。我可能會嘗試的是簡單地停止返回的結果,這樣巨大的10 - 50 MB字符串不會被保留(在原始應用程序中)。

編輯:我想更換代碼如下讀取結果:

while (tasks.Any()) 
{ 
    var listCopy = tasks.ToList(); 

    foreach (var task in listCopy) 
    { 
     if (task.Wait(0)) 
     { 
      report.AddRange(task.Result); 
      tasks.Remove(task); 
      task.Dispose(); 
     } 
    } 

    Thread.Sleep(300); 
} 

我有兩個小時後放棄 - 我會讓它運行整夜今晚,看看它是否完成。內存使用似乎更好,但仍然很慢。

回答

0

你是對的,問題是有

foreach (var task in tasks) 
{ 
    report.AddRange(task.Result); 
} 

但問題是比你想象的要大得多。每調用Result都會阻止調用線程,從而有效地將您的代碼轉換爲過度設計的串行版本,該版本中也有一些Sleeps,太糟糕了!

我建議加入continuation到每一個任務首先將你的代碼轉換成並行版本,例如:

task.ContinueWith(t => { 
    //NOTE1 that t.Result is already ready here 
    //NOTE2 you need synchronization for your data structure, mutex or synchronized collection 
    report.AddRange(t.Result); 
}); 

一旦多數民衆贊成在做我也建議每一個任務把自己從任務列表中刪除,這將讓GC收集它儘快地順着因此它擁有,我建議使用明確的處置只能作爲最後的手段在這裏,總之:

task.ContinueWith(t => { 
    //NOTE1 that t.Result is already ready here 
    //NOTE2 you need synchronization for your data structure, mutex or synchronized collection 
    report.AddRange(t.Result); 
    //NOTE3 Synchronize access to task list! 
    tasks.Remove(t); 
}); 

,或者,喲ü可以去一個水平之上基於任務的並行,並從一開始就申請Parallel方法:

ParallelLoopResult result = Parallel.For(0, 2000, ctr => { 
    // NOTE you still need to synchronize access to report 
    report.Add(/*get your result*/); 
}); 

套用this answer:雖然結果是一樣的,這將介紹遠不如開銷比的任務,特別是對於大集合類似你的(2000件),並導致整體運行時間更快。

0

task.Result將保留對結果數組的引用,直到該任務不再可從任何根訪問。這意味着所有結果數組都將一直存在,直到tasks列表超出範圍。

此外,您創建2000個線程,這意味着最多可以有2000組結果數據同時等待。如果您更改爲消費者生產者模式,並且線程執行可容納2000個作業的工作隊列,那麼使用內存「在飛行中」的事情就會減少。使用諸如TPL Dataflow之類的工具,您可以創建一個管道,該管道的工作人員數量有限,並且工人不會開始新的工作,直到以前的工人通過鏈中的下一個鏈接處理完其工作。

static void Main(string[] args) 
    { 
     var report = new List<string>(); 

                 //We don't use i because you did not have Process accept a parameter of any kind. 
     var producer = new TransformBlock<int, string[]>((i) => DummyProcess.Process(), new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = Environment.ProcessorCount}); 

     //Only 20 processed items can be in flight at once, if the queue is full it will block the producer threads which there only is Environment.ProcessorCount of them. 
     //Only 1 thread is used for the consumer. 
     var consumer = new ActionBlock<string[]>((result) => report.AddRange(result), new ExecutionDataflowBlockOptions{BoundedCapacity = 20}); 
     producer.LinkTo(consumer, new DataflowLinkOptions {PropagateCompletion = true}); 

     for (int i = 0; i < 2000; i++) 
     { 
      //We just add dummy values to queue up 2000 items to be processed. 
      producer.Post(i); 
     } 
     //Signals we are done adding to the producer. 
     producer.Complete(); 

     //Waits for the consumer to finish processing all pending items. 
     consumer.Completion.Wait(); 

     Console.WriteLine("Press RETURN..."); 
     Console.ReadLine(); 
    } 
}