2016-11-11 31 views
3

我想要做什麼,我認爲是一個pausable buffer如何創建一個pausableBuffer W/rxjs 5

我有某人分享他們的代碼,但我無法弄清楚如何把它變成一個定製。操作(沒有打字稿/只ES6

const attach = Rx.Observable.timer(0 * 1000, 8 * 1000).mapTo('@'); 
const detach = Rx.Observable.timer(4 * 1000, 8 * 1000).mapTo('#'); 

const input = Rx.Observable.interval(1* 1000); 
const pauser = attach.mapTo(true).merge(detach.mapTo(false)); 

input 
    .publish(_input => _input 
    .combineLatest(pauser, (v, b) => b) 
    .filter(e => e) 
    .publish(_switch => _input.bufferWhen(() => _switch.take(1))) 
) 
    .flatMap(e => Rx.Observable.from(e)) 
    .concatMap(e => Rx.Observable.empty().delay(150).startWith(e)) 

有人可以幫助我創造這樣我可以做input.pausableBuffer(pauser)(也許定義startsWith)

回答

4

你可以把它添加到原型是這樣的:

var pausableBuffer = function(pauser) { 
    return this.publish(_input => _input 
    .combineLatest(pauser, (v, b) => b) 
    .filter(e => e) 
    .publish(_switch => _input.bufferWhen(() => _switch.take(1))) 
) 
    .flatMap(e => Rx.Observable.from(e)); 
} 

Rx.Observable.prototype.pausableBuffer = pausableBuffer; 

有一點要記住的是,這將從暫停狀態開始。要在活動狀態下啓動它,請將.startWith(true)添加到pauser

var pausableBuffer = function(pauser) { 
    return this.publish(_input => _input 
    .combineLatest(pauser.startWith(true), (v, b) => b) 
    .filter(e => e) 
    .publish(_switch => _input.bufferWhen(() => _switch.take(1))) 
) 
    .flatMap(e => Rx.Observable.from(e)); 
} 

Rx.Observable.prototype.pausableBuffer = pausableBuffer;