2013-06-11 113 views
6

我試圖使用Reactive Extensions(Rx)來緩存任務完成時的枚舉。有誰知道是否有一個乾淨的內置方式做到這一點? ToObservable擴展方法將只是一個IObservable<Task<T>>,這不是我想要的,我想要一個IObservable<T>,然後我可以使用Buffer上。轉換IEnumerable <Task<T>>到IObservable <T>

人爲的例子:

//Method designed to be awaitable 
public static Task<int> makeInt() 
{ 
    return Task.Run(() => 5); 
} 

//In practice, however, I don't want to await each individual task 
//I want to await chunks of them at a time, which *should* be easy with Observable.Buffer 
public static void Main() 
{ 
    //Make a bunch of tasks 
    IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt()); 

    //Is there a built in way to turn this into an Observable that I can then buffer? 
    IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //???? 

    buffered.Subscribe(ints => { 
     Console.WriteLine(ints.Count()); //Should be 15 
    }); 
} 
+0

http://stackoverflow.com/questions/13500456/how-to-convert-an-ienumerabletaskt-to-iobservablet –

回答

7

您可以使用一個事實,即Task可以轉換爲使用another overload of ToObservable()觀察到。

當您有一個(單項)觀察值的集合時,您可以創建一個包含項目的單個觀察值,因爲它們使用Merge()完成。

所以,你的代碼看起來是這樣的:

futureInts.Select(t => t.ToObservable()) 
      .Merge() 
      .Buffer(15) 
      .Subscribe(ints => Console.WriteLine(ints.Count)); 
相關問題