2013-02-25 90 views
-1

我有我的C#4.0應用程序中的對象列表。假設這個列表包含100個學生類的對象。 Reactive Framework有什麼方法可以每次同時執行10個對象?這可能使用Reactive Framework嗎?

每個學生對象都運行一些耗時約10到15秒的方法。因此,第一次通過時,從列表中取出前10個學生對象,並等待所有10個學生對象完成其工作,然後取下10個學生對象,依此類推直到完成列表中的全部項目?

  1. 我有一個List<Student> 100計數。
  2. 首先從列表中取出10個項目,並且同時調用每個對象的長跑方法。
  3. 接收每個對象的返回值並更新UI [訂閱部分]。
  4. 只有前10輪完成並釋放所有內存時,纔開始下一輪。
  5. 對列表中的所有項目重複相同的過程。
  6. 如何捕捉每個過程中的錯誤?
  7. 如何從內存中釋放每個學生對象的資源和其他資源?
  8. 哪個是在Reactive Framework中完成所有這些事情的最佳方法?

回答

0

我嘗試....

var students = new List<Student>(); 
{....} 
var cancel = students 
    .ToObservable(Scheduler.Default) 
    .Window(10) 
    .Merge(1) 
    .Subscribe(tenStudents => 
    { 
     tenStudents.ObserveOn(Scheduler.Default) 
      .Do(x => DoSomeWork(x)) 
      .ObserverOnDispatcher() 
      .Do(tenStudents => UpdateUI(tenStudents)) 
      .Subscribe();    
    }); 
+0

謝謝阿隆。你能解釋一下你的代碼嗎?非常感謝 – user2017793 2013-02-26 14:12:07

+0

真的很簡單。窗口(10)將工作轉換爲塊10.合併(1)在單個線程上工作。將這10名學生轉換成一個內部可觀察的。呃,做一些工作吧。 ObserveOnDispatcher()返回到下一位的UI線程。 Do ...嗯...在UpdatingUI上工作。最後訂閱內部可觀察。沖洗並重復。 – Aron 2013-02-26 14:55:38

+0

再次感謝阿隆。我的疑問是如何釋放每個10個學生對象資源。你的解釋非常有幫助,非常感謝。我擔心內存不足問題。請幫助我。 – user2017793 2013-02-26 15:08:15

1

這個版本將始終在同一時間運行的10名學生。當學生完成時,另一個將開始。當每個學生完成時,你可以處理它有的任何錯誤,然後清理它(這將在下一個學生開始運行之前發生)。

students 
    .ToObservable() 
    .Select(student => Observable.Defer(() => Observable.Start(() => 
     { 
      // do the work for this student, then return a Tuple of the student plus any error 
      try 
      { 
       student.DoWork(); 
       return { Student = student, Error = (Exception)null }; 
      } 
      catch (Exception e) 
      { 
       return { Student = student, Error = e }; 
      } 
     }))) 
    .Merge(10) // let 10 students be executing in parallel at all times 
    .Subscribe(studentResult => 
    { 
     if (studentResult.Error != null) 
     { 
      // handle error 
     } 

     studentResult.Student.Dispose(); // if your Student is IDisposable and you need to free it up. 
    }); 

這不正是問什麼,因爲它沒有開始下一批次之前完成第一批10。這總是保持10並行運行。如果你真的想批10我會調整的代碼。

+0

感謝您的回覆。我對RX很陌生並試圖研究它。你們都很有幫助。在這個解決方案中,我發現只需要10個學生,然後10個學生完成後,就看不到任何代碼,以便接下來10個學生(如批次),直到它覆蓋100個學生。學生名單集合包含100名學生。由於所有100名學生並行運行可能會導致內存異常,我的計劃是分批運行學生10.請幫助。 – user2017793 2013-02-28 00:26:37

+0

'.Merge(10)'這樣做。把它想象成一個酒吧裏的保鏢。一次只允許10名學生參加。只要其中一名學生完成並離開,那裏只剩下9名學生,所以'Merge'會讓另一名學生進入,直到所有100名學生都被處理完畢。 – Brandon 2013-02-28 03:31:35

+0

感謝幫助。一次真正需要10名學生,下一次需要10個學生。當它需要一批10人時,Rx中是否有任何方法可以運行,這10名學生可以並行運行。上述解決方案正在等待每個批次的完成,然後只會調用訂閱。在訂閱中,我有另一個消息來更新wcf客戶端中的UI。但等待每批的完整執行殺死時間。我正在尋找更快的RX方式。有沒有什麼辦法可以批量並行運行成員? – user2017793 2013-02-28 10:46:34

0

這對我來說聽起來非常像TPL的問題。你有一組已知的數據。您想分割一些繁重的處理並行運行,並且希望能夠批處理負載。

我沒有看到問題的任何地方是異步的源代碼,是運動數據的源代碼或需要被動的消費者。這是我建議您使用TPL的理由。

在另一個註釋中,爲什麼要並行處理10個幻數?這是業務需求,還是潛在的優化性能的嘗試?通常最好的做法是讓TaskPool根據核心數量和當前負載計算出最適合客戶端CPU的最佳實踐。我想,隨着設備及其CPU結構(單核,多核,多核,低功耗/禁用內核等)的巨大變化,這變得越來越重要。

這裏有一種方法可以做到這在LinqPad(但要注意缺乏Rx的)

void Main() 
{ 
    var source = new List<Item>(); 
    for (int i = 0; i < 100; i++){source.Add(new Item(i));} 

    //Put into batches of ten, but only then pass on the item, not the temporary tuple construct. 
    var batches = source.Select((item, idx) =>new {item, idx}) 
         .GroupBy(tuple=>tuple.idx/10, tuple=>tuple.item); 

    //Process one batch at a time (serially), but process the items of the batch in parallel (concurrently). 
    foreach (var batch in batches) 
    { 
     "Processing batch...".Dump(); 
     var results = batch.AsParallel().Select (item => item.Process()); 
     foreach (var result in results) 
     { 
      result.Dump(); 
     } 
     "Processed batch.".Dump(); 
    } 
} 


public class Item 
{ 
    private static readonly Random _rnd = new Random(); 
    private readonly int _id; 
    public Item(int id) 
    { 
     _id = id; 
    } 

    public int Id { get {return _id;} } 

    public double Process() 
    { 
     var threadId = Thread.CurrentThread.ManagedThreadId; 
     string.Format("Processing on thread:{0}", threadId).Dump(Id); 
     var loopCount = _rnd.Next(10000,1000000); 
     Thread.SpinWait(loopCount); 
     return _rnd.NextDouble(); 
    } 
    public override string ToString() 
    { 
     return string.Format("Item:{0}", _id); 
    } 
} 

如果你有一個數據在運動問題或反應我很想找出消費者問題,但只是「淡化」了問題,以便於解釋。