2013-11-20 190 views
0

這裏我們有一個Observable Sequence ...在.NET中使用Rx。合併觀察對象

var aSource = new Subject<int>(); 

var bSource = new Subject<int>(); 

var paired = Observable 
      .Merge(aSource, bSource) 
    .GroupBy(i => i).SelectMany(g => g.Buffer(2).Take(1)); 

paired.Subscribe(g => Console.WriteLine("{0}:{1}", g.ElementAt(0), g.ElementAt(1))); 

aSource.OnNext(4); 
bSource.OnNext(1); 
aSource.OnNext(2); 
bSource.OnNext(5); 
aSource.OnNext(3); 
bSource.OnNext(3); 
aSource.OnNext(5); 
bSource.OnNext(2); 
aSource.OnNext(1); 
bSource.OnNext(4); 

輸出: 3:3 5:5 2:2 1:1 4:4

我們會得到每一個事件,一對數字具有相同的ID到達時間。

完美!正是我想要的。

一組兩個,按價值配對。

下一個問題....

如何獲得價值序列的的SelectMany /緩衝。

因此1,2,3,4,5通過OnNext()到達aSource和bSource。然後啓動1-5的FireWireLine()。然後當2,3,4,5,6到達時,我們得到另一個console.writeline()。任何線索任何人?

隨即,在Rx論壇建議看.Window()

http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html

這表面上看起來很完美。就我而言,在這種情況下,我需要一個價值4的窗口。

在查詢序列中它屬於獲取此效果的位置? (a),bSource).GroupBy(i => i).SelectMany(g => g.Buffer(2).Take(1));

輸出 1,2,3,4,5:1,2,3,4,5 2,3,4,5,6 :2,3,4,5,6

此致,

丹尼爾

+0

對於第二部分,數字按順序到達每個源?或隨機順序? –

+0

它可以是隨機的。它們是不同長度「長」過程的結果。 – WebSight

回答

1

假設事件的來源隨機到達,用我的回答"Reordering events with Reactive Extensions"得到事件秩序。

然後使用Observable.Buffer來創建滑動緩衝區:

// get this using the OrderedCollect/Sort in the referenced question 
IObservable<int> orderedSource; 

// then subscribe to this 
orderedSource.Buffer(5, 1); 
+0

謝謝,今晚我回到家時,我會試試這個。我假設根據上面的編輯,Window()是針對有序序列的。 – WebSight

+0

正確,'Window'爲您提供一串流,'Buffer'爲列表流。值得注意的是,你只在關閉時纔得到緩衝區,但窗口流開始立即發送。 –

+0

對,看這個空間....我會在這裏發佈結果與任何進一步的問題。再次感謝詹姆斯。 – WebSight

0

下面是一個擴展方法,當它有n個相同的ID的輸入火災。

public static class RxExtension 
    { 

     public static IObservable<TSource> MergeBuffer<TSource>(this IObservable<TSource> source, Func<TSource, int> keySelector, Func<IList<TSource>,TSource> mergeFunction, int bufferCount) 
     { 
      return Observable.Create<TSource>(o => { 
       var buffer = new Dictionary<int, IList<TSource>>(); 
       return source.Subscribe<TSource>(i => 
       { 
        var index = keySelector(i); 
        if (buffer.ContainsKey(index)) 
        { 
         buffer[index].Add(i); 
        } 
        else 
        { 
         buffer.Add(index, new List<TSource>(){i}); 
        } 
        if (buffer.Count==bufferCount) 
        { 
         o.OnNext(mergeFunction(buffer[index])); 
         buffer.Remove(index); 
        } 
       }); 
      }); 
     } 
    } 

調用擴展。

mainInput = Observable.Merge(inputNodes.ToArray()).MergeBuffer<NodeData>(x => x.id, x => MergeData(x), 1);