2016-07-22 33 views
3

我正在查詢一個數據庫,並檢索結果作爲一行一行'db_row_receieved'事件的行。我試圖按公司ID對這些結果進行分組,但我在訂閱中沒有輸出。RxJS分組發出事件nodejs

db行格式如下所示。

// row 1 
    { 
     companyId: 50, 
     value: 200 
    } 
    // row 2 
    { 
     companyId: 50, 
     value: 300 
    } 
    // row 3 
    { 
     companyId: 51, 
     value: 400 
    } 

代碼:

var source = Rx.Observable.fromEvent(eventEmitter, 'db_row_receieved'); 
var grouped = source.groupBy((x) => { return x.companyId; }); 
var selectMany = grouped.selectMany(x => x.reduce((acc, v) => { 
          return acc + v.value; 
          }, 0)); 

var subscription = selectMany.subscribe(function (obs) { 
         console.log("value: ", obs); 
        } 

預期輸出:

value: 500 // from the group with companyId 50 
value: 400 // from the group with companyId 51 

實際輸出: 認購不輸出任何東西,但使用Rx.Observable.fromArray工作時(的someArray)

有誰能告訴我我哪裏出了問題嗎?

+0

你確定'eventEmitter'實際上是用給定名稱發射事件。其餘的代碼看起來不錯。 –

+0

您是否嘗試過grouped.subscribe(data => {console.log(data);});看看groupeBy是否可以工作 –

+0

@Yury,是的,eventEmitter確實發出一行。 – user1145347

回答

1

所以問題是reduce只會產生一個單一的值,如果基礎流complete d。由於事件發射器是無限源,它總是處於活動狀態。

看看下面的代碼片段 - 第一個例子完成,另一個沒有。

const data = [ 
 
    {k: 'A', v: 1}, 
 
    {k: 'B', v: 10}, 
 
    {k: 'A', v: 1}, 
 
    {k: 'B', v: 10}, 
 
    {k: 'A', v: 1}, 
 
    {k: 'B', v: 10}, 
 
    {k: 'A', v: 1}, 
 
    {k: 'A', v: 1}, 
 
    {k: 'A', v: 1}, 
 
]; 
 

 
Rx.Observable.from(data) 
 
    .concatMap(d => Rx.Observable.of(d).delay(100)) 
 
    .groupBy(d => d.k) 
 
    .mergeMap(group => group.reduce((acc, value) => { 
 
    acc.sum += value.v; 
 
    return acc; 
 
    }, {key: group.key, sum: 0})) 
 
    .do(d => console.log('RESULT', d.key, d.sum)) 
 
    .subscribe(); 
 
    
 
Rx.Observable.from(data) 
 
    .concatMap(d => Rx.Observable.of(d).delay(100)) 
 
    .merge(Rx.Observable.never()) // MERGIN NEVER IN 
 
    // .take(data.length) // UNCOMMENT TO MITIGATE NEVER 
 
    .groupBy(d => d.k) 
 
    .mergeMap(group => group.reduce((acc, value) => { 
 
    acc.sum += value.v; 
 
    return acc; 
 
    }, {key: group.key, sum: 0})) 
 
    .do(d => console.log('RESULT - NEVER - WILL NOT BE PRINTED', d)) 
 
    .subscribe();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.10/Rx.umd.js"></script>

我不知道你的具體使用情況,但2我想到的最常見的事情是:

  • 使用scan(可能與反跳),
  • 使用takeUntil是否有表示下線流結束的事件。