2

我試圖使用Rx實現一個場景,其中有兩個熱點觀察點。流1和流2.根據流1的數據,我需要啓動流2或停止流2.然後將兩個流數據組合成一個使用CombineLatest。下面的id代碼,我能夠想出。有條件地組合兩個Rx流

  1. 有沒有更好的實現方法?

  2. 而我怎樣才能使它更通用,就像我將有流1,然後流2 .. n爲每個流從2 .. n有條件條件2 .. n利用數據流1檢查如果其他流需要啓動或不能再在CombineLatest的方式將所有的數據

CODE:

 IDisposable TfsDisposable = null; 

     // stream 1 
     var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)); 


     // stream 2 
     var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)).Publish(); 


     var observerHot = hotObs.Do(a => 
     { 
      // Based on Condition to start the second stream 
      if (ConditionToStartStream2) 
      { 
       TfsDisposable = TfsDisposable ?? hotObs2.Connect(); 
      } 
     }) 
     .Do(a => 
     { 
      // Based on condition 2 stop the second stream 
      if (ConditionToStopStream2) 
      { 
       TfsDisposable?.Dispose(); 
       TfsDisposable = null; 
      } 
     }).Publish(); 


     // Merge both the stream using Combine Latest 
     var finalMergedData = hotObs.CombineLatest(hotObs2, (a, b) => { return string.Format("{0}, {1}", a, b); }); 

     // Display the result 
     finalMergedData.Subscribe(a => { Console.WriteLine("result: {0}", a); }); 

     // Start the first hot observable 
     observerHot.Connect(); 

回答

1

有這樣一齣戲:

var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0)); 
var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(0.3)); 

var query = 
    hotObs2.Publish(h2s => 
     hotObs.Publish(hs => 
      hs 
       .Select(a => a % 7 == 0 ? h2s : Observable.Empty<long>()) 
       .Switch() 
       .Merge(hs))); 

這需要兩個observable,並使用在lambda內發佈它們的重載來發布它們。它使它們在lambda範圍內變得很熱,並且防止需要管理對.Connect()的調用。

然後,我只是執行條件檢查(在這種情況下是a甚至),然後返回其他流,如果沒有返回一個空的流。

然後.Switch變成IObservable<long>變成IObservable<long>只通過從最新的內部可觀察到的價值。

最後它與原始hs流合併。

有了上面這個例子中,我得到了以下的輸出:

 
0 
1 
2 
3 
1 
2 
3 
4 
5 
6 
7 
23 
24 
25 
8 
+0

嗨,我嘗試使用上面提到的表達,但在訂閱可觀察的,我沒有得到任何輸出數據流。你能解釋一下嗎? –

+0

@BalrajSingh - 我得到了一個輸出,但它沒有像我期待的那樣行事 - 我有什麼問題。我認爲這將會接近你所需要的。我會盡快處理,並提供解釋。 – Enigmativity

+0

我確實看到了開始第二個流的條件。但是沒有條件結束它。即使結果不是必需的,第二個流會繼續運行嗎? –