我一直在使用Kefir(或Bacon.js,選擇你最喜歡的)來涉及實時數據的個人項目,並且已經到了需要將數據記錄在數據庫來追加一個id,然後將鏈中的id傳遞給對象。實際上,將數據插入數據庫(NeDB)不是問題,而是使用回調並在記錄插入數據庫時繼續執行,以及如何解決此問題。在Kefir/Bacon.js流處理
過於簡化的例子:
假設我們有幾個設備解析的數據傾倒入公交車/池:
function Position(data) {
this.id = null;
this.longitude = data.longitude;
this.latitude = data.latitude;
}
self.positionDataPool.map(function(position)) { // is this even what really needs to be done?
// unsure what to do here {
self.db.insert {
longitude: position.longitude
, latitude: position.latitude
}, function(e, newRecord) {
if(e) { ... }
, else {
position.id = newRecord._id;
return position;
}
}
//}
})
.filter(function(position) {
// the position without an id is passed here
...
});
我懷疑這是不正確或不恰當地使用地圖功能,但在嘗試了幾件事情後出來想法。任何想法,建議或幫助將不勝感激。
我的解決方案
做一噸多的閱讀和試驗(對我已經沒有頂部)和要回我的流處理每天的基礎上工作的日子後,我想出瞭解決方案雖然這可能不是最有效的,但此解決方案通過將多個事件源插入數據池(未顯示)來接收來自多個源的輸入。創建一個全新的流來對對象/數據執行單個操作。雖然可擴展性不是這裏的目標,但它允許多個數據源監視來自數據流的數據,而不是直接將數據轉儲到過濾器中。最後,來自處理流的數據將被過濾,只顯示我們想要的結果。
self.savedPositionDataStream = Kefir.stream(function(emitter) {
self.positionDataPool.onValue(function(val) {
self.db.insert {
longitude: position.longitude
, latitude: position.latitude
}, function(e, newRecord) {
val.id = newRecord._id;
emitter.emit(val);
}
});
});
self.filteredPositionData = savedPositionDataStream.filter(...);
感謝您使用fromNodeCallback的完整示例,我試圖在這條道路上多次嘗試並錯過了某個過程。幸運的是,Kefir也支持NodeCallback,我會回過頭來試試這個有趣的事情,看看我能否獲得更好的性能。 – codeape