2016-07-27 56 views
0

最後一個事件之後發生的事情有兩個流,這永遠不會完成一個流事件來自第二流。如何獲得從其他流

在開始的時候,它看起來是這樣的:

--a-> 
(?) 
----> 
(=) 
--a-> 

第二流發出它看起來像這樣第一個事件之後:

--a---b-c-d-> 
    (?) 
-----1------> 
    (=) 
------b-c-d-> 

第二流中一個新的事件之後:

--a---b-c-d---e--f-> 
     (?) 
-----1-------2-----> 
     (=) 
--------------e--f-> 

等等...哪一組操作員需要這樣做?

+1

這是一個很好的問題!我將看看這個[RxJS 5運營商的示例:combineLatest](https://gist.github.com/btroncone/d6cf141d6f2c00dc6b35#combinelatest),看看我能不能拿出一些東西。 – wolfhoundjesse

回答

2

您可以使用CombineLatest返回的事件,你想,例如:

/* Have staggering intervals */ 
var source1 = Rx.Observable.interval(1000) 
    .map(function(i) { 

     return { 
      data: (i + 1), 

      time: new Date() 
     }; 
    }); 

var source2 = Rx.Observable.interval(3000).startWith(-1) 
    .map(function(i) { 

     return { 
      data: (i + 1), 
      time: new Date() 
     }; 
    }); 

// Combine latest of source1 and source2 whenever either gives a value with a selector 
var source = Rx.Observable.combineLatest(
    source1, 
    source2, 
    function(s1, s2) { 

     if (s1.time > s2.time) { 
      return s1.data + ', ' + s2.data; 
     } 

     return undefined; 
    } 
).filter(x => x !== undefined).take(10); 

var subscription = source.subscribe(
    function(x) { 
     console.log('Next: %s', x); 
    }, 
    function(err) { 
     console.log('Error: %s', err); 
    }, 
    function() { 
     console.log('Completed'); 
    }); 
0

我不是100%的大理石,因爲他們似乎有點矛盾,但我會提出一個替代方案是使用windowbuffer如果您只需將第二個來源的事件之後發生的事件組合在一起。

var count = 0; 
//Converts the source into an Observable of Observable windows 
source1.window(source2).subscribe(w => { 
    var local = count++; 
    //Just to show that this is indeed an Observable 
    //Subscribe to the inner Observable 
    w.subscribe(x => console.log(local + ': ' + x)); 
}); 

注意,如果你希望他們與其他來源的價值觀相結合然後通過各種手段使用combineLatestwithLatestFrom由其他答案的建議這隻能從第一源發射的值。