2011-02-07 33 views
0

我是新來的RX,我有我想要的場景工作良好,但在我看來,必須有一個更簡單或更優雅的方式來實現這一點。我所擁有的是IObservable<T>,我想以這樣一種方式訂閱它,最後觸發一個IObservable<U>,,觸發一個異步操作,爲每個它所看到的生成一個U.有沒有更簡單的方法讓IObservable異步依賴另一個IObservable?

我有什麼至今(即偉大工程,但似乎累贅)使用中間事件流是這樣的:

public class Converter { 
    public event EventHandler<UArgs> UDone; 
    public IConnectableObservable<U> ToUs(IObservable<T> ts) { 
    var us = Observable.FromEvent<UArgs>(this, "UDone").Select(e => e.EventArgs.U).Replay(); 
    ts.Subscribe(t => Observable.Start(() => OnUDone(new U(t)))); 
    return us; 
    } 
    private void OnUDone(U u) { 
    var uDone = UDone; 
    if (uDone != null) { 
     uDone(this, u); 
    } 
    } 
} 

... 

var c = new Converter(); 
IConnectableObservable<T> ts = ...; 
var us = c.ToUs(ts); 
us.Connect(); 

... 

我敢肯定,我錯過了一個更簡單的方法嗎這...

+0

您確定要在這裏重播嗎? – 2011-02-09 09:31:34

回答

1

SelectMany應該做你需要什麼,拉平了IO<IO<T>>

Observable.Range(1, 10) 
     .Select(ii => Observable.Start(() => 
      string.Format("{0} {1}", ii, Thread.CurrentThread.ManagedThreadId))) 
     .SelectMany(id=>id) 
     .Subscribe(Console.WriteLine); 
+0

謝謝,斯科特。我對SelectMany瞭如指掌,但出於某種原因,我沒有考慮這個問題,因爲它沒有考慮到(單項)可觀察事物的可觀察性。 – lesscode 2011-02-08 01:50:19

0

這正是SelectMany是:

IObservable<int> ts 

IObservable<string> us = ts.SelectMany(t => StartAsync(t)); 

us.Subscribe(u => 
    Console.WriteLine("StartAsync completed with {0}", u)); 

... 

private IObservable<string> StartAsync(int t) 
{ 
    return Observable.Return(t.ToString()) 
     .Delay(TimeSpan.FromSeconds(1)); 
} 

請記住,如果StartAsync具有可變完成時間,您可以在從輸入值不同的順序接收輸出值。

相關問題