2014-03-06 135 views
1

我的目標是插入非常CSV的大,這樣對我無法使用CSV流,像這樣:產卵mongoinsert

  var myCollection = db.collection(myCollectionId); 

      var q = async.queue(Collection.insert.bind(myCollection), 10); 

      csv() 
      .from.path(myFilePath, {columns: true}) 
      .transform(function(data, index, cb){ 

        q.push(data, function (err, res) { 
         if (err) return cb(err); 
         cb(null, res[0]); 
        }); 

      }) 
      .on('end', function() { 

       q.drain = function() { 

         //do some stufff 
       }; 

      }) 
      .on('error', function (err) { 
       res.end(500, err.message); 
       console.log('on.error() executed'); 
      }); 

     }); 

但是當文件獲得真正大的,像70M +,它的流媒體他們,我的服務器是非常緩慢,並且需要永久,並且當我嘗試在網站上加載頁面時,它在此過程中昏昏欲睡。

爲什麼不可能像這樣使用cron-job來執行mongo插入。我問,因爲相同的插入可能需要mongo命令行30秒。

P.S.不介意readFile和lines部分,我這樣做是因爲我想測試在進程啓動後所有行何時插入到集合中(尚未實現此功能)。

var cronJob = require('cron').CronJob; 
var spawn = require('child_process').spawn; 
var fs = require('fs'); 
function MongoImportEdgeFile(dataID, filePath){ 

var scriptPath = "/local/main/db/mongodb-linux-x86_64-2.4.5/bin/mongoimport"; 
console.log("script path = "+scriptPath) 
var output = ""; 

fs.readFile(filePath, 'utf-8',function(err, data) { 

     if (err){ 
      console.log(err) 
      throw err; 
     } 

     //console.log('data = '+data); 
     var lines = data.split('\n'); 
     console.log("total lines in file = " + lines); 

     var job = new cronJob(new Date(), function() { 
      // store reference to 'this', which is cronJob object. needed to stop job after script is done executing. 
      var context = this; 

      // execute R script asynchronously 
      var script = spawn(scriptPath, [" -d mydb -c Data_ForID_" + dataID + " --file " + filePath + " --type csv" ]); 
      console.log("Executing R script via node-cron: " + scriptPath); 

      // script has finished executing, so complete cron job and fire completion callback 
      script.on('close', function() { 
       console.log('inside script.on(close, function() for import'); 
       context.stop(); 
      }); 
     }, function() { 
      // callback function that executes upon completion 
      console.log("Finished executing import"); 

     }, true); 

    }); 

}

+0

爲什麼不從child_process.exec執行mongoimport? –

+0

嗨喬希,你是什麼意思?我試圖在cron中使用spawn – SOUser

回答

1

您不應該使用個人insert來電。您迫使mongo與每次通話都執行內部同步 - 我認爲考慮到您的平行處理方式,情況更糟。

使用bulk insertion:這與使用array調用insert()一樣簡單。

+0

當我嘗試插入作爲我得到的文檔數組: – SOUser

+0

當我嘗試插入作爲文檔數組我得到錯誤:文檔超過最大允許bson大小16777216字節 – SOUser

+0

而且我已經檢查了數組,所有文檔都有形式{a:'hi',b:'there',c:'bye'}。當我從規模100,000到1,000,000,我得到的錯誤。 – SOUser

0

你可以直接從節點通過創建 child process執行 mongoimportHere's an article on using mongoimport to import a csv。你也可以做 json

不知何故,我錯過了有關在cron中使用mongoimport的部分。如果我理解正確,看起來你知道你想導入的csv,而你正在使用cron來檢查它們。

你考慮過一個消息隊列嗎?這將允許您的處理器立即接收導入作業,而不是間隔。這也會抑制你的處理。

如果您需要更多吞吐量,則可以創建附加到同一隊列的其他偵聽器進程。他們將爭奪下一份工作。這將允許您的解決方案進行擴展。

+0

我想在Node.js中執行此操作,而不是從命令行 – SOUser

+0

@SOUser我建議您在Node.js中執行此操作,而不是在命令行中執行此操作。不過,我對答案作了調整。 –