我試圖上傳並插入大的csv文件(100K行; 10-100M +)到mongo中。使用Node.js和async.queue將大CSV插入到MongoDB中
下面的代碼是我用來從表單接受輸入並將記錄首先插入到我的所有csv的元數據集合中,然後將csv的記錄插入到它自己的集合中的路由。它適用於較小的文件(成千上萬行),但當它達到50K +的順序時需要花太多時間。
下一個片段將csv流用於較大的文件(請參見下文),但在嘗試使用流時遇到錯誤。
問:有人可以幫助修改第一個例子中成流,這樣它會理線大禾的無掛。
exports.addCSV = function(req,res){
var body = req.body;
fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){
if(err){
fileSystem.unlink(req.files.myCSV.path, function(){});
throw error;
}
});
var myObject = { userid: body.userid,
name: body.name,
description: body.description
};
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){
if(err) throw err;
var collection = db.collection('myCSVs');
collection.insert(myObject, function(err, insertedMyObject){
csvParser.mapFile('uploads/myFile', function(err, allRows){
if (err) throw err;
var collectionId = "Rows_ForID_" + insertedMyObject[0]._id;
for (r in allRows) {
allRows[r].metric = parseFloat(allRows[r].metric);
}
var finalcollection = db.collection(collectionId);
finalcollection.insert(allRows, function(err, insertedAllRows) {
if (err) {
res.send(404, "Error");
}
else {
res.send(200);
}
});
});
});
});
}
EDIT(讓人們刪除了待命狀態):
我使用流嘗試這種方法:
exports.addCSV = function(req,res){
var body = req.body;
fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){
if(err){
fileSystem.unlink(req.files.myCSV.path, function(){});
throw error;
}
});
var myObject = { userid: body.userid,
name: body.name,
description: body.description
};
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){
if(err) throw err;
var collection = db.collection('myCSVs');
collection.insert(myObject, function(err, insertedMyObject){
var collectionId = "Rows_ForID_" + insertedMyObject[0]._id;
var finalcollection = db.collection(collectionId);
var q = async.queue(finalcollection.insert.bind(finalcollection), 5);
q.drain = function() {
console.log('all items have been processed');
}
csv()
.from.path('uploads/myFile', {columns: true})
.transform(function(data, index, cb){
q.push(data, cb);
})
.on('end', function() {
res.send(200);
console.log('on.end() executed');
})
.on('error', function (err) {
res.end(500, err.message);
console.log('on.error() executed');
});
});
});
}
但我得到這個錯誤:
events.js:72
throw er; // Unhandled 'error' event
^
TypeError: object is not a function
三,我試過這個流式方法:
var q = async.queue(function (task,callback) {
finalollection.insert.bind(task,function(err, row) { });
callback();
}, 5);
q.drain = function() {
console.log('all items have been processed');
}
csv()
.from.path('uploads/myFile', {columns: true})
.transform(function(data, index, cb){
q.push(data)
})
.on('end', function() {
res.send(200);
console.log('on.end() executed');
})
.on('error', function (err) {
res.end(500, err.message);
console.log('on.error() executed');
});
這會插入幾個,然後中止:
all items have been processed
all items have been processed
Error: Request aborted
at IncomingMessage.<anonymous>
這一個實際上是嘗試相同的CSV的多個集合插入分貝。最後,我想q的一個班輪定義:
var q = async.queue(finalcollection.insert.bind(finalcollection), 5);
隨着:
.transform(function(data, index, cb){
q.push(data,function (err) {
console.log('finished processing foo');
});
})
,並將其插入收集好幾次,每次(如下圖,每次發生的輸出中止 - 爲什麼是不是正確退出並重新插入?):
finished processing foo
finished processing foo
finished processing foo
finished processing foo
finished processing foo
all items have been processed
Error: Request aborted
at IncomingMessage.<anonymous> (.../node_modules/express/node_modules/connect/node_modules/multiparty/index.js:93:17)
at IncomingMessage.EventEmitter.emit (events.js:92:17)
at abortIncoming (http.js:1892:11)
at Socket.serverSocketCloseListener (http.js:1904:5)
at Socket.EventEmitter.emit (events.js:117:20)
at TCP.close (net.js:466:12)
這裏參考答案:http://stackoverflow.com/questions/8045838/importing-a-very-large-record-set-into-mongodb-using-nodejs – phineas
嘗試確定在緩慢的來源。 'mapFile'將整個CSV文件讀取到內存中,例如,如果您的服務器空閒內存不足,可能會導致交換。另外,不要用'insert'一次性批量插入所有記錄,而是嘗試將它們拆分成更易於管理的塊。不要忘記使用'for(var r in ...)'來防止創建一個可覆寫的全局變量。 – robertklep
菲尼亞斯,我已經看過那篇文章,並沒有解決方案。這不是命令行問題。 robertkelp,太模糊。 – SOUser