2012-04-14 53 views
1

我是新來的RX。C#RX(System.Reactive) - 異步 - 發佈一個IEnumerable <DataRow>到多個觀測數據手柄

我想遍歷一個IEnumerable併發布到多個DataHandlers處理它們各自線程中的數據。

下面是我的示例程序。發佈作品和一個新的線程被創建,但3個RowHandlers都在1個線程中運行。我需要3個線程。什麼是實施這個最好的方法?

class Program 
{ 

    public class MyDataGenerator 
    { 
     public IEnumerable<int> myData() 
     { 
      //Heavy lifting....Don't want to process more than once. 
      yield return 1; 
      yield return 2; 
      yield return 3; 
      yield return 4; 
      yield return 5; 
      yield return 6; 
     } 
    } 

    static void Main(string[] args) 
    { 
     MyDataGenerator h = new MyDataGenerator(); 
     Console.WriteLine("Thread id " + Thread.CurrentThread.ManagedThreadId.ToString()); 
     // 
     var shared = h.myData().ToObservable().Publish(); 
     /////////////////////////////// 
     // Row Handling Requirements 
     // 
     //  1. Single Scan of IEnumerable. 
     //  2. Row handlers process data in their own threads. 
     //  3. OK if scanning thread blocks while data is processed 
     // 

     //Create the RowHandlers 
     MyRowHandler rn1 = new MyRowHandler(); 
     rn1.ido = shared.Subscribe(i => rn1.processID(i)); 
     MyRowHandler rn2 = new MyRowHandler(); 
     rn2.ido = shared.Subscribe(i => rn2.processID(i)); 
     MyRowHandler rn3 = new MyRowHandler(); 
     rn3.ido = shared.Subscribe(i => rn3.processID(i)); 
     // 
     shared.Connect(); 
    } 

    public class MyRowHandler 
    { 
     public IDisposable ido = null; 
     public void processID(int i) 
     { 
      var o = Observable.Start(() => 
             { 
              Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i)); 
              Thread.Sleep(30); 
              Console.WriteLine("Done Thread ID"+Thread.CurrentThread.ManagedThreadId.ToString()); 
             } 
            ); 
      o.First(); 
     } 
    } 
} 

發現:從的Rx

編碼速度&代碼質量增益一個接收來以性能爲代價的。任務/代表無疑更快地成倍增長。這意味着需要了解Rx的最重要的事情是何時使用Rx。以下是總結指南草案。對於大容量,我可以看到Rx在chuncking,combine和其他許多流處理器模型中的使用;然而,基本的異步不應該使用rx。

我會發布的圖像與矩陣準則,但網站不會讓我張貼圖片

+0

您正在使用哪個版本的Rx? – 2012-04-15 05:33:44

+0

從你的解釋中不清楚,你正在解決什麼問題。如果「等待繁重的任務」,那麼異步呼叫就可以做到。如果避免重新計算,就像你說的那樣,比var cachedData = myData()。ToArray()會在沒有Rx的情況下執行。 – 2014-11-21 21:16:28

回答

3

如果我正確理解你的排序要求,你想三個平行運行掃描,你可以觀察的TaskPool並從那裏訂閱;

... 

//Create the RowHandlers 
MyRowHandler rn1 = new MyRowHandler(); 
rn1.ido = shared.ObserveOn(Scheduler.TaskPool).Subscribe(i => rn1.processID(i)); 

... 

注意,因爲你然後異步運行,你的主線程不等待掃描得到完成,你的程序將終止馬上除非你比如把Console.ReadKey()在程序結束。

編輯:關於運行相同的線程「一路」,你有點奇怪的安排。如果你在rowhandler中刪除了observable,你可以使用Scheduler.NewThread並獲得好的結果;

... 

var rowHandler1 = new MyRowHandler(); 
rowHandler1.ido = shared.ObserveOn(Scheduler.NewThread).Subscribe(rowHandler1.ProcessID); 

... 

public void ProcessID(int i) 
{ 
    Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i)); 
    Thread.Sleep(30); 
    Console.WriteLine("Done Thread ID" + Thread.CurrentThread.ManagedThreadId.ToString(CultureInfo.InvariantCulture)); 
} 

這會給每個訂閱自己的線程,並保持它。

+0

太棒了。像魅力一樣工作。我還沒有在API文檔和示例中獲得ObserveOn。我在.NET 3.5上,所以我不得不使用Scheduler.ThreadPool。 TaskPool似乎在名稱空間中。 – user1332889 2012-04-14 08:27:15

+0

不幸的是,Rx似乎比手動管理線程要慢很多。就好像每一行強制一個新的ThreadPool請求......每次釋放並分配一個新線程。這是真的?或者有沒有辦法讓它保持相同的線程,直到所有的行都被處理? – user1332889 2012-04-14 13:48:01

+0

@ user1332889更新的答案與信息如何實現。 – 2012-04-14 15:52:55