2017-10-19 83 views
4

如何基於分組方法將永不結束的流拆分爲多個結束流?RxJs將流拆分爲多個流

--a--a-a-a-a-b---b-b--b-c-c---c-c-d-d-d-e...> 

到這些觀測

--a--a-a-a-a-| 
      b---b-b--b-| 
         c-c---c-c-| 
            d-d-d-| 
             e...> 

正如你所看到的,a是在開始的時候,我收到b後,我將不再獲得a所以它應該結束。這就是爲什麼正常的groupBy不好。

+0

只是爲了清楚起見,你是否想要每次發生序列變化時發出可觀察對象的可觀察對象,還是希望發出連續項的數組的可觀察對象? – Pace

+0

第一。我想要在原始發射時發射值的observables。如果原文發生了變化(從'a'到'b'),那麼'a'的可觀測值需要停止 –

回答

5

您可以使用windowshare源觀測。還有一個小竅門與bufferCount(2, 1)

const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e'; 
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share(); 

source 
    .bufferCount(2, 1) // delay emission by one item 
    .map(arr => arr[0]) 
    .window(source 
     .bufferCount(2, 1) // keep the previous and current item 
     .filter(([oldValue, newValue]) => oldValue !== newValue) 
    ) 
    .concatMap(obs => obs.toArray()) 
    .subscribe(console.log); 

這版畫(因爲toArray()):

[ 'a', 'a', 'a', 'a', 'a' ] 
[ 'b', 'b', 'b', 'b' ] 
[ 'c', 'c', 'c', 'c' ] 
[ 'd', 'd', 'd' ] 
[ 'e' ] 

這種解決方案的問題是訂閱到source順序。我們需要window通知者在第一個bufferCount之前訂閱。否則,首先將物品推進一次,然後檢查物品是否與.filter(([oldValue, newValue]) ...)之前的物品不同。

這意味着將需要一個window之前推遲發射(這是第.bufferCount(2, 1).map(arr => arr[0])

或者,也許它更容易與publish()控制訂閱我的順序:

const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e'; 
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share(); 

const connectable = source.publish(); 

connectable 
    .window(source 
     .bufferCount(2, 1) // keep the previous and current item 
     .filter(([oldValue, newValue]) => oldValue !== newValue) 
    ) 
    .concatMap(obs => obs.toArray()) 
    .subscribe(console.log); 

connectable.connect(); 

輸出是相同

+0

是啊好像我們失去了非常有用的'發佈(選擇器:Observable => Observable ):Observable '發佈的重載以簡化訂閱共享。 – Brandon

+0

@martin糾正我,如果我錯了,但在你的第一個解決方案,你延遲源觀察更多然後實際需要。我認爲你可以使用.delay(0) – ZahiC

+0

而不是bufferCount(2,1).map(arr => arr [0])。另外,爲了保留前一個和當前項目,你可以替換bufferCount(2, 1)與更優雅的pairwise()運算符。 – ZahiC

3

也許有人能想出更簡單的東西,但這個工程(小提琴:https://fiddle.jshell.net/uk01njgc/)...

let counter = 0; 

let items = Rx.Observable.interval(1000) 
.map(value => Math.floor(value/3)) 
.publish(); 

let distinct = items.distinctUntilChanged() 
.publish(); 

distinct 
.map(value => { 
    return items 
    .startWith(value) 
    .takeUntil(distinct); 
}) 
.subscribe(obs => { 
    let obsIndex = counter++; 
    console.log('New observable'); 
    obs.subscribe(
    value => { 
     console.log(obsIndex.toString() + ': ' + value.toString()); 
    }, 
    err => console.log(err), 
    () => console.log('Completed observable') 
); 
}); 

distinct.connect(); 
items.connect(); 
+0

另外,您可以使用'share'而不是'publish'節省連接呼叫的麻煩但我沒有嘗試。 – Pace

2

這裏有一個變化,包裝所有的suscription分享給你...

const stream = ...; 

// an Observable<Observable<T>> 
// each inner observable completes when the value changes 
const split = Observable 
    .create(o => { 
    const connected = stream.publish(); 

    // signals each time the values change (ignore the initial value) 
    const newWindowSignal = connected.distinctUntilChanged().skip(1); 

    // send the observables to our observer 
    connected.window(newWindowSignal).subscribe(o); 

    // now "start" 
    return connected.connect(); 
    }); 
+0

它發出我'[a,a,a,a,b],[b,b,b,c]' –