2014-09-01 19 views
1

我試圖用mongoose將大數據集插入到mongodb中。但在此之前,我需要確保我的for循環正常工作。節點使用貓鼬插入大數據

// basic schema settings 
var mongoose = require('mongoose'); 
var Schema = mongoose.Schema; 
var TempcolSchema = new Schema({ 
    cid: { 
     type: Number, 
     required: true 
    }, 
    loc:[] 
}); 
TempcolSchema.index({ 
    'loc': "sphere2d" 
}); 


// we can easily see from the output that the forloop runs correctly 
mongoose.connect('mongodb://localhost/mean-dev', function(err){ 
    for (var i = 0; i < 10000000; i++) { 
     var c = i; 
     console.log(c); 
    } 
}); 

輸出爲1,2,3,4,......等

現在我想添加一個貓鼬保存語句轉換成for循環。

mongoose.connect('mongodb://localhost/mean-dev', function(err){ 
    var Tempcol = mongoose.model('Tempcol', TempcolSchema); 

    for (var i = 0; i < 10000000; i++) { 
     var c = i; 
     console.log(c); 
     var lon = parseInt(c/100000); 
     var lat = c%100000; 
     new Tempcol({cid: Math.random(), loc: [lon, lat]}).save(function(err){}); 
    } 
}); 

輸出仍然是1,2,3,4,......但是for循環一段時間後停止,並說是達到最大堆棧和有某種記憶的問題。另外,當我檢查集合時,我意識到根本沒有插入任何數據點。

因此,有誰知道可能發生了什麼?謝謝。

回答

2

這裏的問題是,您正在運行的循環並未等待每個操作完成。所以實際上你只是排隊等待1000年的.save()請求並試圖同時運行它們。你不能在合理的限制內做到這一點,因此你會得到錯誤迴應。

async模塊在處理該迭代器的回調時有多種迭代方法,其中最簡單的直接方法是whilst。貓鼬也爲你處理連接管理,而不需要回調中嵌入,因爲模型是連接識別:

var tempColSchema = new Schema({ 
    cid: { 
     type: Number, 
     required: true 
    }, 
    loc:[] 
}); 

var TempCol = mongoose.model("TempCol", tempColSchema); 

mongoose.connect('mongodb://localhost/mean-dev'); 

var i = 0; 
async.whilst(
    function() { return i < 10000000; }, 
    function(callback) { 
     i++; 
     var c = i; 
     console.log(c); 
     var lon = parseInt(c/100000); 
     var lat = c%100000; 
     new Tempcol({cid: Math.random(), loc: [lon, lat]}).save(function(err){ 
      callback(err); 
     });    
    }, 
    function(err) { 
     // When the loop is complete or on error 
    } 
); 

不是最偉大的方式做到這一點,但它仍然是一個寫在同一時間,你可以使用其他方法來「治理」併發操作,但這至少不會炸燬調用堆棧。

形成MongoDB 2.6及更高版本,您可以使用Bulk Operations API來一次處理多個寫入服務器。所以這個過程是相似的,但這個時候,你可以在一個單一的寫入和響應,這是更快一次發送1000到服務器:

var tempColSchema = new Schema({ 
    cid: { 
     type: Number, 
     required: true 
    }, 
    loc:[] 
}); 

var TempCol = mongoose.model("TempCol", tempColSchema); 

mongoose.connect('mongodb://localhost/mean-dev'); 

mongoose.on("open",function(err,conn) { 

    var i = 0; 
    var bulk = TempCol.collection.initializeOrderedBulkOp(); 

    async.whilst(
     function() { return i < 10000000; }, 
     function(callback) { 
     i++; 
     var c = i; 
     console.log(c); 
     var lon = parseInt(c/100000); 
     var lat = c%100000; 

     bulk.insert({ "cid": Math.random(), "loc": [ lon, lat ] }); 

     if (i % 1000 == 0) { 
      bulk.execute(function(err,result) { 
       bulk = TempCol.collection.initializeOrderedBulkOp(); 
       callback(err); 
      }); 
     } else { 
      process.nextTick(callback); 
     } 
     }, 
     function(err) { 
     // When the loop is complete or on error 

     // If you had a number not plainly divisible by 1000 
     if (i % 1000 != 0) 
      bulk.execute(function(err,result) { 
       // possibly check for errors here 
      }); 
     } 
    ); 

}); 

,實際上是利用其尚未在本地驅動程序的方法暴露在貓鼬身上,所以需要額外注意確保連接可用。這只是一個例子,但不是唯一的方法,但重點是貓鼬的「神奇」連接並不是建立在這裏,所以你應該確定它已經建立。

你有一個要處理的項目的數量,但不是你應該在最後一個塊中調用bulk.execute()以及顯示的情況,但它取決於響應模數的數量。

重點是不將堆棧操作增加到不合理的大小,並保持處理的有限性。這裏的流程控制允許在進行下一次迭代之前需要一些時間才能真正完成的操作。所以無論是批量更新還是一些額外的並行排隊是您想要的最佳性能。

如果您不希望寫入錯誤是致命的,而是以不同的方式處理這些錯誤,那麼也有.initializeUnorderedBulkOp()表單。大多數情況下,請參閱Bulk API的官方文檔以及如何解釋所給回覆的響應。

+0

非常感謝。我會稍後嘗試代碼! – 2014-09-04 16:41:40

+0

我試過你的方法,它按預期工作。但我還有一個問題...而不是回調(錯誤)我把setTimeout(回調(錯誤),10000),並希望延遲所有回調10秒。它不像我想象的那樣工作。我的推理有什麼問題嗎?謝謝。 – 2014-09-04 16:59:19

+0

@Chandler這裏的關鍵是你不能使用標準的'for'循環,並且需要循環來要求回調,因爲這裏每種情況都有。我不確定根據您的評論,您在代碼中試圖設置超時或爲了什麼目的。事實上,如果你有另一個問題,最好再提出一個問題。這個答案告訴你有關回調控制循環和插入的最佳方法。 – 2014-09-04 22:12:27