2015-12-05 83 views
1

我無法篩選基於groupBy observable的Observable。Group Observables,然後通過計數篩選

這裏是一個基於groupBy()文檔的例子:

var codes = [ 
    { keyCode: 38}, // up 
    { keyCode: 38}, // up 
    { keyCode: 40}, // down 
    { keyCode: 40}, // down 
    { keyCode: 37}, // left 
    { keyCode: 39}, // right 
    { keyCode: 37}, // left 
    { keyCode: 39}, // right 
    { keyCode: 66}, // b 
    { keyCode: 65} // a 
]; 

var source = Rx.Observable.from(codes) 
    .groupBy(
    function (x) { return x.keyCode; }, 
    function (x) { return x.keyCode; } 
) 
    .flatMap(obs => 
    obs.count() 
     .filter(x => x > 1) 
     .flatMap(obs.toArray()) 
) 

source.subscribe(console.log) 

這將返回:

[] 
[] 
[] 
[] 

因此,觀測值都被過濾,但它們的價值都沒有了。

回答

2

將observables作爲單個對象(比如數組)而不是作爲值的異步序列來推理是很棘手的。這裏發生的是:

  • 你把obs觀察到的,然後你申請count

    count將等待觀察的完成,將輸出一個值,元素的數量,然後完成。

  • 您可以過濾該值,但是您應用toArrayobs已完成。這就是爲什麼你得到一個空陣列。

所以這裏的問題是一樣的obs變量是在不同的時間不同的東西,所以你不能避免來思考的時候,當你在你的程序的不同地方重複使用相同的變量。

現在,如果您想要將計數編號輸出到subscribe,那麼您應該能夠直接編寫obs.count().filter(x => x > 1).toArray()而不是您現在擁有的編號。 flatMap接受承諾和陣列除了作爲選擇器函數的返回值observables:

比較。 https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/selectmany.md

UPDATE: 與考慮到的意見,這是我想出了(待測試):

var source = Rx.Observable 
    .from(codes) 
    .scan(function (acc, code) { 
      acc[code] = acc[code] + 1; 
      acc.latest = code; 
      return acc; 
      }, {}) 
    .filter(function (acc) {return acc[acc.latest] > 1;}) 
    .flatMap(function (acc) {return acc[acc.latest] == 2? Rx.Observable.repeat(acc.latest, 2) : Rx.just(acc.latest);}) 

如果你能等到有所有的值進行處理,可以停止以上過濾後添加.last()。這將使您獲得所有值的最後一個累加器。從那裏你可以做你想做的事。

+0

的事情是,我不希望得到計數出到'subscribe' - 我需要保留原來的觀測。 –

+0

你是什麼意思? 'obs => obs.toArray()'?但是那麼你會丟失計數信息。您想做什麼? – user3743222

+1

'obs.count().filter(x => x> 1).toArray()'返回數組返回值'count',但我需要原始數組(即原始可觀察數) –

0

我想出了類似的解決方案,以user3743222's answer

var filtered = source 
    .flatMap(obs => { 
     var accumulated = obs.scan((a,b) => a.concat(b), []) 
     return obs 
     .scan((a) => a + 1, 0) 
     .filter(i => i > 1) 
     .withLatestFrom(
      accumulated, 
      (a, b) => Rx.Observable.from(b) 
     ) 
    }) 
    .flatMap(d => d) 
    .subscribe(d => console.log(d))