2011-12-07 41 views
21

我想用並行循環這樣處理事情:Parallel.Foreach +良率回報?

public void FillLogs(IEnumerable<IComputer> computers) 
{ 
    Parallel.ForEach(computers, cpt=> 
    { 
     cpt.Logs = cpt.GetRawLogs().ToList(); 
    }); 

} 

好,它工作正常。但是如果我想讓FillLogs方法返回一個IEnumerable,該怎麼辦?

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers) 
{ 
    Parallel.ForEach(computers, cpt=> 
    { 
     cpt.Logs = cpt.GetRawLogs().ToList(); 
     yield return cpt // KO, don't work 
    }); 

} 

編輯

它似乎不是可能的...但我用這樣的:

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers) 
{ 
    return computers.AsParallel().Select(cpt => cpt); 
} 

但是,在我把cpt.Logs = cpt.GetRawLogs().ToList();指令

+0

你的'IEnumerable 'return-type不會帶負載。 –

回答

12

短版本 - 不,這是不可能通過迭代器塊;較長的版本可能涉及調用者的迭代器線程(執行出列隊列)和並行工作者(執行入隊隊列)之間的同步隊列/出隊列;但作爲一個方面說明 - 日誌通常是IO限定的,並行IO限制的事情往往不能很好地工作。

如果來訪者要消耗每需要一定的時間,那麼可能會有一定道理的做法是一次只能處理一個記錄,但能做到這一點呼叫者在消費之前的登錄;即它開始 a Task對於下一個項目之前的yield,並等待完成yield之後......但這又是,很複雜。作爲一個簡單的例子:

static void Main() 
{ 
    foreach(string s in Get()) 
    { 
     Console.WriteLine(s); 
    } 
} 

static IEnumerable<string> Get() { 
    var source = new[] {1, 2, 3, 4, 5}; 
    Task<string> outstandingItem = null; 
    Func<object, string> transform = x => ProcessItem((int) x); 
    foreach(var item in source) 
    { 
     var tmp = outstandingItem; 

     // note: passed in as "state", not captured, so not a foreach/capture bug 
     outstandingItem = new Task<string>(transform, item); 
     outstandingItem.Start(); 

     if (tmp != null) yield return tmp.Result; 
    } 
    if (outstandingItem != null) yield return outstandingItem.Result; 
} 
static string ProcessItem(int i) 
{ 
    return i.ToString(); 
} 
+0

不完全,但我有一個類似的問題(過分忽視:))與返回產量結果parallel.foreach。不同的背景下,我想到它可能會幫助某人。 http://stackoverflow.com/questions/32183463/why-i-get-duplicated-values-during-a-parallel-task-execution-using-an-iterator – Spock

4

我不不想冒犯,但也許缺乏理解。 Parallel.ForEach意味着TPL將按照幾個線程中的可用硬件運行該foreach。但這意味着,ii可以同時完成這項工作! yield return讓你有機會從列表中獲得一些值(或者什麼都不是),並在需要時一個接一個地給它們。它可以防止需要首先查找與條件匹配的所有項目,然後遍歷它們。這確實是一個性能優勢,但不能並行完成。

+1

但是,如果每一代產量返回值都需要一定的時間,那麼您是否會想要並行處理下一個產量返回值,以便更快地得到它。我想有一個緩衝區或什麼? 我不知道是否缺乏理解,但我可以想象(即使理解)情況下人們想要更快地產生收益。 我猜想收益率的意圖是在必要時進行處理,所以也許收益率回報並不嚴格。但我當然可以想象什麼是想要的... –

0

如何

  Queue<string> qu = new Queue<string>(); 
      bool finished = false; 
      Task.Factory.StartNew(() => 
      { 
       Parallel.ForEach(get_list(), (item) => 
       { 
        string itemToReturn = heavyWorkOnItem(item);   
        lock (qu) 
         qu.Enqueue(itemToReturn);       
       }); 
       finished = true; 
      }); 

      while (!finished) 
      { 
       lock (qu) 
        while (qu.Count > 0) 
         yield return qu.Dequeue(); 
       //maybe a thread sleep here? 
      } 

編輯: 我覺得這是更好的:

 public static IEnumerable<TOutput> ParallelYieldReturn<TSource, TOutput>(this IEnumerable<TSource> source, Func<TSource, TOutput> func) 
     { 
      ConcurrentQueue<TOutput> qu = new ConcurrentQueue<TOutput>(); 
      bool finished = false; 
      AutoResetEvent re = new AutoResetEvent(false); 
      Task.Factory.StartNew(() => 
      { 
       Parallel.ForEach(source, (item) => 
       { 
        qu.Enqueue(func(item)); 
        re.Set(); 
       }); 
       finished = true; 
       re.Set(); 
      }); 

      while (!finished) 
      { 
       re.WaitOne(); 
       while (qu.Count > 0) 
       { 
        TOutput res; 
        if (qu.TryDequeue(out res)) 
         yield return res; 
       } 
      } 
     } 

EDIT2:我同意短答案。這段代碼沒用,你不能打破良品循環。