2017-10-08 52 views
0

我需要獲取兩個流的重複項目。我認爲我幾乎可以設法做到這一點,但只有當第二流的重複項目順利進行時纔會這樣做。對於前:RxJs - 獲取兩個可觀察對象的重複項目

這工作:

first = Observable.of(1, 2, 3) 
second = Observable.of(2, 3, 1) 

但這並不:

first = Observable.of(1, 4, 3) 
second = Observable.of(1, 2, 3) 

當我的循環獲取到4,它打破:

EmptyError {name: "EmptyError", stack: "EmptyError: no elements in sequence↵ at new Emp…e (http://localhost:4200/vendor.bundle.js:161:22)", message: "no elements in sequence"}

整我代碼在一個功能中,您可以複製/粘貼並測試它:

findDublicates() { 

    let match = 0; // setting it to 0, so later could assign other number 
    let keys = []; // list of maching keys 
    let elementAt = 0; // index of item of first observable   

    let allKeys$; 
    let validKeys$; 

    // counting the length of both observables, so this will be the number of loops 
    // that checks for dublicates 
    let allKeysLength; 
    let validKeysLength; 
    let allKeysLength$ = Observable.of(2, 1, 4, 5, 7).count() 
    allKeysLength$.subscribe(val => allKeysLength = val) 
    let validKeysLength$ = Observable.of(1, 2, 3, 8, 5).count() 
    validKeysLength$.subscribe(val => validKeysLength = val) 

    let cycles = Math.min(allKeysLength,validKeysLength); // length of the shorter observable    

    // wrapping it in a function so when called variables will take new values 
    function defineObs() { 

    allKeys$ = Observable.of(2, 1, 4, 5, 7) 
     .elementAt(elementAt).take(1); 

    validKeys$ = Observable.of(1, 2, 3, 8, 5) 
     .filter((x) => (x === match)).first(); 
    } 

    for (var i=0; i<=cycles; i++) { 

    defineObs(); 

    allKeys$.subscribe(
     function (val) { match = val }, 
     function (err) { console.log(err) }, 
     function() { console.log('Done filter')} 
    ); 
    validKeys$.subscribe(
     function (val) { keys.push(val) }, 
     function (err) { console.log(err) }, 
     function() { console.log('Done push')} 
    ); 

    elementAt += 1; 
    cycles -= 1; 

    } 

    return console.log(keys); 

} 

感謝您的任何幫助。

+2

你爲什麼不使用observables來處理這一切。你正嘗試應用一種不同的範式,這種範式對於observables來說效果不好。 – Everest

+0

我也認爲這可能只能用observables來完成,但我是RP的新手,所以不知道該怎麼做 –

+0

所以請隨意展示如何獲得兩個流的重複:) –

回答

3

如果你不關心哪個流發出一組重複的的第一個值,你可能只是將它們合併和治療作爲一個單一的數據流發現重複的值:

first.merge(second) 
    .scan(([ dupes, uniques ], next) => 
     [ uniques.has(next) ? dupes.add(next) : dupes, uniques.add(next) ], 
     [ new Set(), new Set() ] 
    ) 
    .map(([ dupes ]) => dupes) 

注:集上面是不可變的,以避免scan中的未定義行爲。

+0

這太棒了!正如我所見,你知道RP和函數式編程。不錯的工作。當我完全理解這是什麼時,我會接受:D –

+0

@ J.D。我可以放下要點:將兩個流中的所有物品合併成一個物體後,我們將所有物品放在一起('uniques'),並且只有在之前看到過'next'物品時才添加到'dupes'。 'scan'可能是代碼片段中最密集的部分,所以我建議[文檔](http://reactivex.io/documentation/operators/scan.html)。 'scan'會給我們兩個集合的所有中間值,所以'map'只提取'dupes'。 – concat

+0

@ J.D。運營商和鏈接旨在使行爲明顯而無痛。通常,這是一種最簡單,最穩健的方法,可以將運算符提交給內存。否則,確定需要什麼通常是最大的努力。快樂的Rx學習! – concat

0

我會檢查Observable.combineLatestscan方法的可觀察序列。

下面是我在想什麼,結合使用combineLatestcombineLatest兩個observables和運算符scan。您甚至可以使用Set來確保唯一性,甚至可以使用mapfilter

+0

請發表一個完整的答案 - 我所有的代碼都在我的問題 –