2012-11-21 23 views
4

我有兩個訂購IObservable<double> s,並希望將它們合併成一個訂購IObservable<double>。一個例子如下:合併兩個有序IObservables

A 2 3 4 - -  5 - 
B - - - 1 5  - 6 
Out - - - 1 2 3 4 5 - 

的想法是,當它是確定最終訂單的Out纔會產生價值。我相信這應該很容易做到,但我不能提供一個好的解決方案(在這種情況下,儘可能多的由rx運算符組成的好方法);

編輯:我想下面的程序產生下面的輸出

static void Main(string[] args) 
{ 
    var a = new Subject<int>(); 
    var b = new Subject<int>(); 

    a.MergeSort(b).Subscribe(Console.WriteLine); 

    a.OnNext(2); 
    Console.WriteLine("tick"); 
    a.OnNext(4); 
    Console.WriteLine("tick"); 
    a.OnNext(6); 
    Console.WriteLine("tick"); 
    b.OnNext(0); 
    Console.WriteLine("tick"); 
    b.OnNext(1); 
    Console.WriteLine("tick"); 
    b.OnNext(5); 
    Console.WriteLine("tick"); 
    b.OnNext(7); 
    Console.WriteLine("tick"); 
} 

Output: 
tick 
tick 
tick 
0 
tick 
1 
tick 
2 
4 
5 
tick 
6 
tick 

回答

1

這確實它作爲RX擴展操作

public static class MergeMixins 
{ 
    public static IObservable<int> MergeSort(this IObservable<int> This, IObservable<int> other) 
    { 
     return Observable.Create<int>((observer) => 
      { 
       Queue<int> BufferA = new Queue<int>(); 
       Queue<int> BufferB = new Queue<int>(); 

       Action<Queue<int>, int> update = (Queue<int> pushBuffer, int value)=>{ 

        pushBuffer.Enqueue(value); 


        while (BufferA.Count() != 0 && BufferB.Count() != 0) 
        { 
         if (BufferA.Peek() < BufferB.Peek()) 
          observer.OnNext(BufferA.Dequeue()); 
         else 
          observer.OnNext(BufferB.Dequeue()); 
        } 
       }; 

       return new CompositeDisposable(
        This.Subscribe(v => update(BufferA, v)), 
        other.Subscribe(v => update(BufferB, v))); 

      }); 

    } 

} 

廣告我的測試輸出使用您的測試

Result StandardOutput: 
tick 
tick 
tick 
0 
tick 
1 
tick 
2 
4 
5 
tick 
6 
tick 
+0

實際上,代碼並不是線程安全的,因爲我們可以在不同線程上訂閱A和B.代碼可以改進1 – bradgonesurfing

+1

如果你把OnNext放在一個循環中,它將清除一個緩衝區(如果它是建立起來的)(這隻對熱的可觀察物質很重要) –

+0

關於熱的可觀察物體的好處。無論如何,代碼應該被推廣來處理icomparables並且也使線程安全。在這可能有用的情況下,一個現實生活合併的好例子是時間戳信息。 @Matt解決方案是你在找什麼? – bradgonesurfing