2014-07-11 41 views
3

我想緩衝Bacon.js中的EventStream的值,就像buffer(closingSelector)在RxJava中的行爲一樣。當「控制器流」(RxJava方法中的closingSelector)發出一個新值時,事件緩衝區將被刷新。Bacon.js控制與其他流的流的緩衝

所以我希望流輸出類似於stream.bufferWithTimeOrCount,但是不是用時間間隔或事件計數來控制緩衝,我想控制與其他流的緩衝。

有沒有一種簡單的方法來實現這個Bacon.js?

回答

1

Bacon.js沒有你需要的功能,所以我看着bacon.js source並寫了一個holdWhen的修改版本。

Bacon.EventStream.prototype.bufferUntilValue = function(valve) { 
var valve_ = valve.startWith(false); 

    return this.filter(false).merge(valve_.flatMapConcat((function(_this) { 
    return function() { 
     return _this.scan([], (function(xs, x) { 
      return xs.concat(x); 
     }), { 
      eager: true 
     }).sampledBy(valve).take(1); 
    }; 
    })(this))); 
}; 

要查看此實際情況,請查看此jsFiddle

+0

真的很好的方法!比我的解決方案更清潔(一起使用'holdWhen'和'bufferWithTime')。花了一段時間才瞭解每個閥門事件如何產生一個新的「掃描儀」,當下一個閥門事件到達時,用'sampledBy(valve).take(1)'返回緩衝值。 – attekei

+0

我簡化了解決方案,看到它在這裏:[jsFiddle](http://jsfiddle.net/7DeQy/) (雖然後來我發現你修改'holdWhen'只有一點點,所以也許有一個原因'this .filter(false).merge(..)'等) – attekei

1

Bacon.holdWhen可用,因爲約0.7.14你想要做什麼差不多,雖然緩衝事件發出的一個接一個:

stream.holdWhen(閥)暫停以及如果閥門最後一個事件是緩衝事件流truthy。當閥門變得麻痹時,所有緩衝事件都會釋放。

如果你需要發出緩衝的事件作爲一個單一的事件,你可以嘗試類似如下:

// source streams 
var sourceObservable = Bacon.interval(1000); 
var closingSelector = new Bacon.Bus(); 

// Constructing a new Observable where we're going to keep our state. 
// 
// We need to keep track of two things: 
// - the buffer that is currently being filled, and 
// - a previous buffer that is being flushed. 
// The state will then look like this: 
// [ buffer, flushed] 
// where both buffer and flushed is an array of events from the source observable. 

// empty initial state 
var initialState = {buffer: [], flushed: []} 

// There are two operations on the state: appending a new element to the buffer 
// and flushing the current buffer: 

// append each event from the source observable to the buffer, 
// keeping flushed unchanged 
var appends = sourceObservable.map(function(e) { 
    return function(state) { 
     state.buffer.push(e); return state; 
    } 
}); 

// each event from the closingSelector replaces the `flushed` with 
// the `buffer`'s contents, inserting an empty buffer. 
var flushes = closingSelector.map(function(_) { 
    return function(state) { return {buffer: [], flushed: state.buffer} } 
}) 

// merge appends and flushes into a single stream and apply them to the initial state 
var ops = appends.merge(flushes) 
var state = ops.scan(initialState, function(acc, f) { return f(acc) }); 

// resulting stream of flushed events 
var flushed = state.sampledBy(closingSelector).map(function(state) { return state.flushed }) 

// triggered with `closingSelector.push({})` 
flushed.onValue(function(x) { console.log("flushed", x) }) 
0

stream.holdWhen(valve)看起來幾乎正是你想要的。它的工作原理與buffer(closingSelector)有所不同:它不是緩衝所有的時間,而是從closingSelector事件沖洗緩衝區,它根據value流中的最後一個值切換緩衝區。

也許你可以使用holdWhen,因爲它是,但如果你想喜歡在buffer(closingSelector)行爲,你可能會做這樣的事情:

var result = sourceStream.holdWhen(closingSelector.flatMap(function(){ 
    return Bacon.fromArray([false, true]); 
}).toProperty(true)); 

在每個事件從closingSelector我們產生value流與價值兩個事件truefalse,即關閉緩衝(觸發刷新),然後立即重新打開。

+0

其實我最初的解決方案非常相似,請參閱[jsFiddle](http://jsfiddle.net/RRWJf/)。唯一不同的是,因爲我想將輸出作爲值的數組,我使用'bufferWithTime(1)'來收集它們。它的工作,但我認爲必須有更好的解決方案:) – attekei