2017-08-05 140 views
1

我試圖有效地插入大量的數據(XML文件超過70GB的大小)沒有崩潰我的MongoDB服務器。目前,這就是我在使用的NodeJS做xml-stream流插入XML數據的數據庫

var fs = require('fs'), 
    path = require('path'), 
    XmlStream = require('xml-stream'), 
    MongoClient = require('mongodb').MongoClient, 
    assert = require('assert'), 
    ObjectId = require('mongodb').ObjectID, 
    url = 'mongodb://username:[email protected]:27017/mydatabase', 
    amount = 0; 

var stream = fs.createReadStream(path.join(__dirname, 'motor.xml')); 
var xml = new XmlStream(stream); 

xml.collect('ns:Statistik'); 
xml.on('endElement: ns:Statistik', function(item) { 
    var insertDocument = function(db, callback) { 
     db.collection('vehicles').insertOne(item, function(err, result) { 
      amount++; 
      if (amount % 1000 == 0) { 
       console.log("Inserted", amount); 
      } 
      callback(); 
     }); 
    }; 

    MongoClient.connect(url, function(err, db) { 
     insertDocument(db, function() { 
      db.close(); 
     }); 
    }); 
}); 

當我打電話xml.on()它基本上返回樹/元素,我目前。由於這是JSON的直線,所以我可以將它作爲參數提供給我的db.collection().insertOne()函數,並將其按照我的需要插入到數據庫中。

所有代碼的實際工作,因爲它是現在,但經過約3000插入停止(約需10秒)。我懷疑這是因爲我打開數據庫連接,插入數據,然後每次在XML文件中看到一棵樹時都關閉連接,在這種情況下,大約有3000次。

我可以在某種程度上將insertMany()函數合併爲100個(或更多)的塊,但我不太確定這將如何處理這一切,這些工作都是流式傳輸和異步處理。

所以我的問題是:如何插入大量XML(以JSON)到我的MongoDB數據庫沒有它崩潰?

回答

1

你假設.insertMany()比寫每一次都好,所以它只是收集"stream"上的數據。

因爲執行是「異步」,你通常要避免在棧上太多的主動呼叫,所以通常你調用.insertMany()然後.resume()一旦回調完成之前.pause()"stream"

var fs = require('fs'), 
    path = require('path'), 
    XmlStream = require('xml-stream'), 
    MongoClient = require('mongodb').MongoClient, 
    url = 'mongodb://username:[email protected]:27017/mydatabase', 
    amount = 0; 

MongoClient.connect(url, function(err, db) { 

    var stream = fs.createReadStream(path.join(__dirname, 'motor.xml')); 
    var xml = new XmlStream(stream); 

    var docs = []; 
    //xml.collect('ns:Statistik'); 

    // This is your event for the element matches 
    xml.on('endElement: ns:Statistik', function(item) { 
     docs.push(item);   // collect to array for insertMany 
     amount++; 

     if (amount % 1000 === 0) { 
      xml.pause();    // pause the stream events 
      db.collection('vehicles').insertMany(docs, function(err, result) { 
      if (err) throw err; 
      docs = [];    // clear the array 
      xml.resume();   // resume the stream events 
      }); 
     } 
    }); 

    // End stream handler - insert remaining and close connection 
    xml.on("end",function() { 
     if (amount % 1000 !== 0) { 
     db.collection('vehicles').insertMany(docs, function(err, result) { 
      if (err) throw err; 
      db.close(); 
     }); 
     } else { 
     db.close(); 
     } 
    }); 

}); 

甚至現代化不是有點:

const fs = require('fs'), 
     path = require('path'), 
     XmlStream = require('xml-stream'), 
     MongoClient = require('mongodb').MongoClient; 

const uri = 'mongodb://username:[email protected]:27017/mydatabase'; 

(async function() { 

    let amount = 0, 
     docs = [], 
     db; 

    try { 

    db = await MongoClient.connect(uri); 

    const stream = fs.createReadStream(path.join(__dirname, 'motor.xml')), 
      xml = new XmlStream(stream); 

    await Promise((resolve,reject) => { 
     xml.on('endElement: ns:Statistik', async (item) => { 
     docs.push(item); 
     amount++; 

     if (amount % 1000 === 0) { 
      try { 
      xml.pause(); 
      await db.collection('vehicle').insertMany(docs); 
      docs = []; 
      xml.resume(); 
      } catch(e) { 
      reject(e) 
      } 
     } 

     }); 

     xml.on('end',resolve); 

     xml.on('error',reject); 
    }); 

    if (amount % 1000 !== 0) { 
     await db.collection('vehicle').insertMany(docs); 
    } 

    } catch(e) { 
    console.error(e); 
    } finally { 
    db.close(); 
    } 

})(); 

注意,MongoClient連接實際上封裝了所有的其他操作。你只需要連接一次,以及其他操作發生在對"stream"事件處理程序。

因此,對於你XMLStream的觸發事件處理程序的表達式匹配和數據提取和收集到一個數組。每調用1000個項目.insertMany()就會插入文檔,在「異步」調用上「暫停」和「恢復」。

一旦完成「結束」事件的"stream"解僱。這是關閉數據庫連接的地方,事件循環將被釋放並結束程序。

儘管可以通過允許各種各樣的.insertMany()調用一次發生(通常爲「合併大小」以避免溢出調用堆棧),從而獲得某種程度的「並行性」,但基本上這是該過程的外觀最簡單的形式就是在等待其他異步I/O完成時簡單地暫停。

注意:註釋從你的原代碼.collect()方法按照follow up question這似乎並不是必需的,實際上是保留真的應該每次使用後丟棄在存儲節點寫入數據庫。

+0

哦,男孩,它看起來像它的作品!我試圖基本上按照自己的意願做出自己的決定,但我無法打開我的頭。我的問題是,它給了我非常不一致的結果。如果我插入1000條記錄,它實際上只會在數據庫中顯示300條記錄(大約在那)。可能是因爲我只是在完成之前隨機關閉連接。非常感謝,尼爾! – MortenMoulder

+0

另一個說明:你有任何線索,爲什麼它開始真正去!大約75000插入後緩慢?當數據庫爲空時,我們正在談論1000/sec,但是當我達到75000時,可能是100-200/sec。 – MortenMoulder

+0

@MortenMoulder使用'.insertMany()'可以看到顯着的改進,但對於吞吐量取決於有多少數據,這是一個完全不同而且非常廣泛的主題。沒有具體細節就需要考慮太多的因素,比如什麼索引(如果有),可用內存,寫入分配和基本硬件。如果您有其他問題,通常最好[提出新問題](https://stackoverflow.com/questions/ask),您可以清楚地表達詳細信息。 –