2015-10-27 35 views
0

我一直在使用Kefir(或Bacon.js,選擇你最喜歡的)來涉及實時數據的個人項目,並且已經到了需要將數據記錄在數據庫來追加一個id,然後將鏈中的id傳遞給對象。實際上,將數據插入數據庫(NeDB)不是問題,而是使用回調並在記錄插入數據庫時​​繼續執行,以及如何解決此問題。在Kefir/Bacon.js流處理

過於簡化的例子:

假設我們有幾個設備解析的數據傾倒入公交車/池:

function Position(data) { 
    this.id = null; 
    this.longitude = data.longitude; 
    this.latitude = data.latitude; 
} 

self.positionDataPool.map(function(position)) { // is this even what really needs to be done? 
    // unsure what to do here { 
     self.db.insert { 
      longitude: position.longitude 
      , latitude: position.latitude 
     }, function(e, newRecord) { 
      if(e) { ... } 
      , else { 
       position.id = newRecord._id; 
       return position; 
      } 
     } 
    //} 
}) 
.filter(function(position) { 
    // the position without an id is passed here 
    ... 
}); 

我懷疑這是不正確或不恰當地使用地圖功能,但在嘗試了幾件事情後出來想法。任何想法,建議或幫助將不勝感激。

我的解決方案

做一噸多的閱讀和試​​驗(對我已經沒有頂部)和要回我的流處理每天的基礎上工作的日子後,我想出瞭解決方案雖然這可能不是最有效的,但此解決方案通過將多個事件源插入數據池(未顯示)來接收來自多個源的輸入。創建一個全新的流來對對象/數據執行單個操作。雖然可擴展性不是這裏的目標,但它允許多個數據源監視來自數據流的數據,而不是直接將數據轉儲到過濾器中。最後,來自處理流的數據將被過濾,只顯示我們想要的結果。

self.savedPositionDataStream = Kefir.stream(function(emitter) { 
    self.positionDataPool.onValue(function(val) { 
     self.db.insert { 
      longitude: position.longitude 
      , latitude: position.latitude 
     }, function(e, newRecord) { 
      val.id = newRecord._id; 
      emitter.emit(val); 
     } 
    }); 
}); 

self.filteredPositionData = savedPositionDataStream.filter(...); 

回答

2

至少有Bacon.js,您可以使用Bacon.fromNodeCallback到插入調用結果封裝成數據流。像

Bacon.fromNodeCallback(self.db, "insert", dataToBeInserted) 

當然,你可以做到這一點與Bacon.fromBinder或類似Kefir.streamfromNodeCallback幫助使這個更容易,因爲它會自動處理成功/錯誤值,並把它們轉換成適當的流事件。

然後flatMap而不是map,做插入並提供結果流:

let insertionResultE = self.positionDataPool.flatMap(val => 
    Bacon.fromNodeCallback(self.db, "insert", val).map("._id") 
) 
insertionResultE.log("insertion result") 

類似的方法適用於酸牛奶爲好。問題在於,您不能在map中執行異步計算和可能失敗的計算,但您可以在flapMap中執行此操作。

這裏只有一個問題,那就是你需要添加至少一個用戶insertionResultE來激活它。在上面的例子中,log這樣做。

+0

感謝您使用fromNodeCallback的完整示例,我試圖在這條道路上多次嘗試並錯過了某個過程。幸運的是,Kefir也支持NodeCallback,我會回過頭來試試這個有趣的事情,看看我能否獲得更好的性能。 – codeape