我想緩衝Bacon.js中的EventStream的值,就像buffer(closingSelector)
在RxJava中的行爲一樣。當「控制器流」(RxJava方法中的closingSelector)發出一個新值時,事件緩衝區將被刷新。Bacon.js控制與其他流的流的緩衝
所以我希望流輸出類似於stream.bufferWithTimeOrCount
,但是不是用時間間隔或事件計數來控制緩衝,我想控制與其他流的緩衝。
有沒有一種簡單的方法來實現這個Bacon.js?
我想緩衝Bacon.js中的EventStream的值,就像buffer(closingSelector)
在RxJava中的行爲一樣。當「控制器流」(RxJava方法中的closingSelector)發出一個新值時,事件緩衝區將被刷新。Bacon.js控制與其他流的流的緩衝
所以我希望流輸出類似於stream.bufferWithTimeOrCount
,但是不是用時間間隔或事件計數來控制緩衝,我想控制與其他流的緩衝。
有沒有一種簡單的方法來實現這個Bacon.js?
Bacon.js沒有你需要的功能,所以我看着bacon.js source並寫了一個holdWhen
的修改版本。
Bacon.EventStream.prototype.bufferUntilValue = function(valve) {
var valve_ = valve.startWith(false);
return this.filter(false).merge(valve_.flatMapConcat((function(_this) {
return function() {
return _this.scan([], (function(xs, x) {
return xs.concat(x);
}), {
eager: true
}).sampledBy(valve).take(1);
};
})(this)));
};
要查看此實際情況,請查看此jsFiddle。
Bacon.holdWhen
可用,因爲約0.7.14你想要做什麼差不多,雖然緩衝事件發出的一個接一個:
stream.holdWhen(閥)暫停以及如果閥門最後一個事件是緩衝事件流truthy。當閥門變得麻痹時,所有緩衝事件都會釋放。
如果你需要發出緩衝的事件作爲一個單一的事件,你可以嘗試類似如下:
// source streams
var sourceObservable = Bacon.interval(1000);
var closingSelector = new Bacon.Bus();
// Constructing a new Observable where we're going to keep our state.
//
// We need to keep track of two things:
// - the buffer that is currently being filled, and
// - a previous buffer that is being flushed.
// The state will then look like this:
// [ buffer, flushed]
// where both buffer and flushed is an array of events from the source observable.
// empty initial state
var initialState = {buffer: [], flushed: []}
// There are two operations on the state: appending a new element to the buffer
// and flushing the current buffer:
// append each event from the source observable to the buffer,
// keeping flushed unchanged
var appends = sourceObservable.map(function(e) {
return function(state) {
state.buffer.push(e); return state;
}
});
// each event from the closingSelector replaces the `flushed` with
// the `buffer`'s contents, inserting an empty buffer.
var flushes = closingSelector.map(function(_) {
return function(state) { return {buffer: [], flushed: state.buffer} }
})
// merge appends and flushes into a single stream and apply them to the initial state
var ops = appends.merge(flushes)
var state = ops.scan(initialState, function(acc, f) { return f(acc) });
// resulting stream of flushed events
var flushed = state.sampledBy(closingSelector).map(function(state) { return state.flushed })
// triggered with `closingSelector.push({})`
flushed.onValue(function(x) { console.log("flushed", x) })
stream.holdWhen(valve)
看起來幾乎正是你想要的。它的工作原理與buffer(closingSelector)
有所不同:它不是緩衝所有的時間,而是從closingSelector
事件沖洗緩衝區,它根據value
流中的最後一個值切換緩衝區。
也許你可以使用holdWhen
,因爲它是,但如果你想喜歡在buffer(closingSelector)
行爲,你可能會做這樣的事情:
var result = sourceStream.holdWhen(closingSelector.flatMap(function(){
return Bacon.fromArray([false, true]);
}).toProperty(true));
在每個事件從closingSelector
我們產生value
流與價值兩個事件true
和false
,即關閉緩衝(觸發刷新),然後立即重新打開。
其實我最初的解決方案非常相似,請參閱[jsFiddle](http://jsfiddle.net/RRWJf/)。唯一不同的是,因爲我想將輸出作爲值的數組,我使用'bufferWithTime(1)'來收集它們。它的工作,但我認爲必須有更好的解決方案:) – attekei
真的很好的方法!比我的解決方案更清潔(一起使用'holdWhen'和'bufferWithTime')。花了一段時間才瞭解每個閥門事件如何產生一個新的「掃描儀」,當下一個閥門事件到達時,用'sampledBy(valve).take(1)'返回緩衝值。 – attekei
我簡化了解決方案,看到它在這裏:[jsFiddle](http://jsfiddle.net/7DeQy/) (雖然後來我發現你修改'holdWhen'只有一點點,所以也許有一個原因'this .filter(false).merge(..)'等) – attekei