2015-06-01 68 views
1

我在回答這個問題:Rx extensions: Where is Parallel.ForEach?爲了使用Rx並行運行一些操作。Rx擴展Parallel.ForEach節流

我遇到的問題是,它似乎分配的每個請求的新線程,而使用Parallel.ForEach確實少得多。

我並行運行的進程內存密集,所以如果我一次嘗試處理數百個項目,提供給鏈接問題的答案很快就會看到內存不足。

有沒有一種方法可以修改該答案以限制在任何給定時間完成的項目數量?

我已經採取了看WindowBuffer操作,我的代碼看起來是這樣的:

return inputs.Select(i => new AccountViewModel(i)) 
    .ToObservable() 
    .ObserveOn(RxApp.MainThreadScheduler) 
    .ToList() 
    .Do(l => 
    { 
     using (Accounts.SuppressChangeNotifications()) 
     { 
      Accounts.AddRange(l); 
     } 
    }) 
    .SelectMany(x => x) 
    .SelectMany(acc => Observable.StartAsync(async() => 
    { 
     var res = await acc.ProcessAsync(config, m, outputPath); 
     processed++; 
     var prog = ((double) processed/inputs.Count())*100.0; 
     OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog)); 
     OverallProgress.Progress.OnNext(prog); 
     return res; 
    })) 
    .All(x => x); 

理想我希望能夠批量起來考慮視圖模型的塊,我則打電話ProcessAsync方法,並且只有一次完成所有批次才能繼續。

理想情況下,我希望它如果即使只有一批完成,它也繼續前進,但只能保持相同的批量大小。所以如果我有一批5和1完成,我想另一個開始,但只有一個,直到有更多的空間可用。

回答

0

像往常一樣,保羅·貝茨已經回答過類似的問題,解決了我的問題:

問題:Reactive Extensions Parallel processing based on specific number

對使用​​Observable.Defer然後沒入批次的一些信息,使用我修改我的以前的代碼是這樣的:

return inputs.Select(i => new AccountViewModel(i)) 
    .ToObservable() 
    .ObserveOn(RxApp.MainThreadScheduler) 
    .ToList() 
    .Do(l => 
    { 
     using (Accounts.SuppressChangeNotifications()) 
     { 
      Accounts.AddRange(l); 
     } 
    }) 
    .SelectMany(x => x) 
    .Select(x => Observable.DeferAsync(async _ => 
    { 
     var res = await x.ProcessAsync(config, m, outputPath); 
     processed++; 
     var prog = ((double) processed/inputs.Count())*100.0; 
     OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog)); 
     OverallProgress.Progress.OnNext(prog); 
     return Observable.Return(res); 
    })) 
    .Merge(5) 
    .All(x => x); 

當然,我得到了滾動完成行爲(例如,如果1/5完成,那麼只有一個開始)。

顯然我有幾個基本知識要掌握,但這太棒了!

+0

Fyi你可以使用'FromAsync'而不是'DeferAsync',它可以讓你的異步lambda更自然地返回它的結果,而不需要用'Return'來包裝它。 – Brandon

+0

另請注意,用於跟蹤和報告進度的代碼具有一些多線程競爭條件,並可能違反某些Rx併發規則。我建議將所有進度跟蹤代碼移到「合併」子句之後的「Do」子句中。這將消除比賽。 – Brandon

+0

@Brandon在合併之後不會將其移動到一個Do,這意味着進度報告只發生在每批之後?我想在每個項目後報告進度。 – Clint