2014-12-18 107 views
1

我試圖在一組Mongoose模型上使用Highland.js作爲數據庫更新腳本,它似乎非常適合在Model.find()上調用QueryStream。我有一些同步的事情要做(更新我的模型以符合新的模式,一些清理操作),最後我想要save()這個文檔。我配置了一些需要運行的預保存鉤子,並且這些更新與直接Model.update()不兼容。我已經成功地得到它通過Q.js和高地的組合工作排序的:Highland.js中的流上的異步轉換

var sender_stream = Sender.find({}).stream(); 
var promise_save = function(document) { 
    var deferred = Q.defer(); 
    document.save(deferred.makeNodeResolver()); 
    return _(deferred.promise); 
} 

var sender_deferred = Q.defer(); 
_(sender_stream).map(function(sender) { 
    // set some fields on sender... 
    return sender; 
}).map(promise_save).series().on('done', sender_deferred.resolve).resume(); 

然而,這似乎並沒有解決這個承諾,我不知道這是否是「正確的」保持事物的美好和流暢的方式......將Q.js和Highland.js如此密切地結合似乎也很奇怪。有沒有更好的辦法?

+0

更新:它看起來越來越像這是一個問題,'未完成'事件沒有被調用比數據沒有返回。儘管如此,仍然不確定這是否是高地的使用方式。 – 2014-12-18 23:55:03

回答

3

我對Q或高地知之甚少。但是這對於查詢流中的轉換函數看起來很簡單。

var stream = Sender.find({}).stream({ transform: manipulate }) 

function manipulate(document) { 
    // do stuff here 
    return document; 
} 

stream.on("data", function(document) { 
    stream.pause() 
    document.save(function(error) { 
     // error handle, maybe stream.destroy([err]) if you want it to stop immediately 
     stream.resume(); 
    }); 
}); 

stream.on("error", function(err){ 
    //error handle 
}); 

stream.on("close", function(){ 
    console.log("hopefully this worked for you"); 
}); 

轉換函數將在發送'data'事件之前在文檔上運行。一旦轉換函數完成它的東西,它的返回值被髮送到'數據'函數。然後你只需暫停/保存/恢復。

1

您可以使用Highland的異步功能:http://highlandjs.org/#async而不是Promise。貓鼬也返回一個承諾,所以你可以換與高地,而不是異步函數的風格,但仍避免增加Q.

我會建議使用的.map().series().flatMap()而是趨於平緩的溪流回一個文檔流。然後添加.done()也可以用來創建一個Thunk,而不是使用.resume()與'done'事件監聽器結合使用。

老實說,不是100%確定爲什麼你在調用'done'事件時遇到問題。

var sender_stream, set_fields, save, sender_deferred; 

sender_stream = Sender.find({}).stream(); 

save = function save(document) { 
    return _(function(push, next) { 
     document.save(function(err, result) { 
      push(err, document); 
      push(null, _.nil); 
     }); 
    }); 
}; 

set_fields = function setFields(sender) { 
    // set some fields on sender... 
    return sender; 
}; 

sender_deferred = Q.defer(); 

_(sender_stream) 
    .map(setFields) 
    .flatMap(save) 
    .done(function() { 
     sender_deferred.resolve(); 
    });