您可以使用window
和share
源觀測。還有一個小竅門與bufferCount(2, 1)
:
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();
source
.bufferCount(2, 1) // delay emission by one item
.map(arr => arr[0])
.window(source
.bufferCount(2, 1) // keep the previous and current item
.filter(([oldValue, newValue]) => oldValue !== newValue)
)
.concatMap(obs => obs.toArray())
.subscribe(console.log);
這版畫(因爲toArray()
):
[ 'a', 'a', 'a', 'a', 'a' ]
[ 'b', 'b', 'b', 'b' ]
[ 'c', 'c', 'c', 'c' ]
[ 'd', 'd', 'd' ]
[ 'e' ]
這種解決方案的問題是訂閱到source
順序。我們需要window
通知者在第一個bufferCount
之前訂閱。否則,首先將物品推進一次,然後檢查物品是否與.filter(([oldValue, newValue]) ...)
之前的物品不同。
這意味着將需要一個window
之前推遲發射(這是第.bufferCount(2, 1).map(arr => arr[0])
或者,也許它更容易與publish()
控制訂閱我的順序:
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();
const connectable = source.publish();
connectable
.window(source
.bufferCount(2, 1) // keep the previous and current item
.filter(([oldValue, newValue]) => oldValue !== newValue)
)
.concatMap(obs => obs.toArray())
.subscribe(console.log);
connectable.connect();
輸出是相同
只是爲了清楚起見,你是否想要每次發生序列變化時發出可觀察對象的可觀察對象,還是希望發出連續項的數組的可觀察對象? – Pace
第一。我想要在原始發射時發射值的observables。如果原文發生了變化(從'a'到'b'),那麼'a'的可觀測值需要停止 –