2013-12-21 42 views
2

我試圖上傳並插入大的csv文件(100K行; 10-100M +)到mongo中。使用Node.js和async.queue將大CSV插入到MongoDB中

下面的代碼是我用來從表單接受輸入並將記錄首先插入到我的所有csv的元數據集合中,然後將csv的記錄插入到它自己的集合中的路由。它適用於較小的文件(成千上萬行),但當它達到50K +的順序時需要花太多時間。

下一個片段將csv流用於較大的文件(請參見下文),但在嘗試使用流時遇到錯誤。

問:有人可以幫助修改第一個例子中成流,這樣它會理線大禾的無掛。

exports.addCSV = function(req,res){ 

var body = req.body; 

fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){ 
    if(err){ 
     fileSystem.unlink(req.files.myCSV.path, function(){}); 
     throw error; 
    } 
}); 

var myObject = { userid: body.userid, 
        name: body.name, 
        description: body.description 
       }; 

var MongoClient = require('mongodb').MongoClient; 
MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){ 

    if(err) throw err; 

    var collection = db.collection('myCSVs'); 

    collection.insert(myObject, function(err, insertedMyObject){ 

     csvParser.mapFile('uploads/myFile', function(err, allRows){ 
       if (err) throw err; 

       var collectionId = "Rows_ForID_" + insertedMyObject[0]._id; 

       for (r in allRows) { 
        allRows[r].metric = parseFloat(allRows[r].metric); 
       } 

       var finalcollection = db.collection(collectionId); 
       finalcollection.insert(allRows, function(err, insertedAllRows) { 
         if (err) { 
          res.send(404, "Error"); 
         } 
         else { 
          res.send(200); 
         } 
       }); 
     }); 
    }); 
}); 

} 

EDIT(讓人們刪除了待命狀態):

我使用流嘗試這種方法:

exports.addCSV = function(req,res){ 

    var body = req.body; 

    fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){ 
    if(err){ 
     fileSystem.unlink(req.files.myCSV.path, function(){}); 
     throw error; 
    } 
    }); 

    var myObject = { userid: body.userid, 
       name: body.name, 
       description: body.description 
      }; 

    var MongoClient = require('mongodb').MongoClient; 
    MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){ 

    if(err) throw err; 

    var collection = db.collection('myCSVs'); 

    collection.insert(myObject, function(err, insertedMyObject){ 

     var collectionId = "Rows_ForID_" + insertedMyObject[0]._id; 
     var finalcollection = db.collection(collectionId); 
     var q = async.queue(finalcollection.insert.bind(finalcollection), 5); 

     q.drain = function() { 
      console.log('all items have been processed'); 
     } 

     csv() 
     .from.path('uploads/myFile', {columns: true}) 
     .transform(function(data, index, cb){ 

       q.push(data, cb); 

     }) 
     .on('end', function() { 
      res.send(200); 
      console.log('on.end() executed'); 
     }) 
     .on('error', function (err) { 
      res.end(500, err.message); 
      console.log('on.error() executed'); 
     }); 

    }); 

}); 

} 

但我得到這個錯誤:

events.js:72 
    throw er; // Unhandled 'error' event 
     ^
TypeError: object is not a function 

三,我試過這個流式方法:

var q = async.queue(function (task,callback) { 
finalollection.insert.bind(task,function(err, row) { }); 
callback(); 
}, 5); 

q.drain = function() { 
    console.log('all items have been processed'); 
} 

csv() 
.from.path('uploads/myFile', {columns: true}) 
.transform(function(data, index, cb){ 
    q.push(data) 
}) 
.on('end', function() { 
    res.send(200); 
    console.log('on.end() executed'); 
}) 
.on('error', function (err) { 
    res.end(500, err.message); 
    console.log('on.error() executed'); 
}); 

這會插入幾個,然後中止:

all items have been processed 
all items have been processed 
Error: Request aborted 
    at IncomingMessage.<anonymous>  

這一個實際上是嘗試相同的CSV的多個集合插入分貝。最後,我想q的一個班輪定義:

var q = async.queue(finalcollection.insert.bind(finalcollection), 5); 

隨着:

.transform(function(data, index, cb){ 

       q.push(data,function (err) { 
        console.log('finished processing foo'); 
       }); 

}) 

,並將其插入收集好幾次,每次(如下圖,每次發生的輸出中止 - 爲什麼是不是正確退出並重新插入?):

finished processing foo 
finished processing foo 
finished processing foo 
finished processing foo 
finished processing foo 
all items have been processed 

Error: Request aborted 
    at IncomingMessage.<anonymous> (.../node_modules/express/node_modules/connect/node_modules/multiparty/index.js:93:17) 
    at IncomingMessage.EventEmitter.emit (events.js:92:17) 
    at abortIncoming (http.js:1892:11) 
at Socket.serverSocketCloseListener (http.js:1904:5) 
at Socket.EventEmitter.emit (events.js:117:20) 
at TCP.close (net.js:466:12) 
+0

這裏參考答案:http://stackoverflow.com/questions/8045838/importing-a-very-large-record-set-into-mongodb-using-nodejs – phineas

+1

嘗試確定在緩慢的來源。 'mapFile'將整個CSV文件讀取到內存中,例如,如果您的服務器空閒內存不足,可能會導致交換。另外,不要用'insert'一次性批量插入所有記錄,而是嘗試將它們拆分成更易於管理的塊。不要忘記使用'for(var r in ...)'來防止創建一個可覆寫的全局變量。 – robertklep

+0

菲尼亞斯,我已經看過那篇文章,並沒有解決方案。這不是命令行問題。 robertkelp,太模糊。 – SOUser

回答

1

你應該用流處理一個大文件。

這裏是一個可能的解決方案:

var queue = async.queue(collection.insert.bind(collection), 5); 

csv() 
.from.path('./input.csv', { columns: true }) 
.transform(function (data, index, cb) { 
    queue.push(data, function (err, res) { 
     if (err) return cb(err); 
     cb(null, res[0]); 
    }); 
}) 
.on('error', function (err) { 
    res.send(500, err.message); 
}) 
.on('end', function() { 
    queue.drain = function() { 
     res.send(200); 
    }; 
}); 

請注意:

  • ,我們使用的node-csv流API,它可以確保數據在同一時間處理的文件閱讀:以這種方式,整個文件不會立即在內存中讀取。每個記錄都執行transform處理程序;
  • 我們使用async.queue,這是一個異步處理隊列:最多5個處理程序(finalcollection.insert)並行執行。

這個例子應該測試,因爲我不太確定它是否能很好地處理背壓。此外,隊列的併發級別應根據您的特定配置進行調整。

您還可以找到一個working gist here

+0

當我嘗試你的代碼時出現這個錯誤:dbName = self.db.databaseName; ^ TypeError:Can not讀取屬性'databaseName'的未定義 – SOUser

+0

看起來像一個綁定問題,嘗試'var queue = async.queue(finalcollection.insert.bind(finalcollection),5);' –

+0

events.js:72 throw er; // Unhandled 「錯誤」事件 ^ 類型錯誤:對象#沒有法「的indexOf」 – SOUser