我是新來的RX。C#RX(System.Reactive) - 異步 - 發佈一個IEnumerable <DataRow>到多個觀測數據手柄
我想遍歷一個IEnumerable併發布到多個DataHandlers處理它們各自線程中的數據。
下面是我的示例程序。發佈作品和一個新的線程被創建,但3個RowHandlers都在1個線程中運行。我需要3個線程。什麼是實施這個最好的方法?
class Program
{
public class MyDataGenerator
{
public IEnumerable<int> myData()
{
//Heavy lifting....Don't want to process more than once.
yield return 1;
yield return 2;
yield return 3;
yield return 4;
yield return 5;
yield return 6;
}
}
static void Main(string[] args)
{
MyDataGenerator h = new MyDataGenerator();
Console.WriteLine("Thread id " + Thread.CurrentThread.ManagedThreadId.ToString());
//
var shared = h.myData().ToObservable().Publish();
///////////////////////////////
// Row Handling Requirements
//
// 1. Single Scan of IEnumerable.
// 2. Row handlers process data in their own threads.
// 3. OK if scanning thread blocks while data is processed
//
//Create the RowHandlers
MyRowHandler rn1 = new MyRowHandler();
rn1.ido = shared.Subscribe(i => rn1.processID(i));
MyRowHandler rn2 = new MyRowHandler();
rn2.ido = shared.Subscribe(i => rn2.processID(i));
MyRowHandler rn3 = new MyRowHandler();
rn3.ido = shared.Subscribe(i => rn3.processID(i));
//
shared.Connect();
}
public class MyRowHandler
{
public IDisposable ido = null;
public void processID(int i)
{
var o = Observable.Start(() =>
{
Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i));
Thread.Sleep(30);
Console.WriteLine("Done Thread ID"+Thread.CurrentThread.ManagedThreadId.ToString());
}
);
o.First();
}
}
}
發現:從的Rx
編碼速度&代碼質量增益一個接收來以性能爲代價的。任務/代表無疑更快地成倍增長。這意味着需要了解Rx的最重要的事情是何時使用Rx。以下是總結指南草案。對於大容量,我可以看到Rx在chuncking,combine和其他許多流處理器模型中的使用;然而,基本的異步不應該使用rx。
我會發布的圖像與矩陣準則,但網站不會讓我張貼圖片
您正在使用哪個版本的Rx? – 2012-04-15 05:33:44
從你的解釋中不清楚,你正在解決什麼問題。如果「等待繁重的任務」,那麼異步呼叫就可以做到。如果避免重新計算,就像你說的那樣,比var cachedData = myData()。ToArray()會在沒有Rx的情況下執行。 – 2014-11-21 21:16:28