2017-02-17 125 views
13

我們做約50萬的元素讀取(使用xml-stream)的XML文件並執行把它們插入到MongoDB的是這樣的:如何在node.js斷開連接期間緩衝MongoDB插入?

xml.on(`endElement: product`, writeDataToDb.bind(this, "product")); 

插入在writeDataToDb(type, obj)看起來是這樣的:

collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { }); 

現在蒙戈時連接斷開連接,xml流仍會讀取,並且控制檯將充滿錯誤消息(無法插入,斷開連接,EPIPE斷開...)。

docs它說:

當您關閉mongod的過程中,駕駛員停止處理操作,並保持緩衝它們由於bufferMaxEntries是-1默認情況下,這意味着緩存中的所有操作。

這個緩衝區實際上做了什麼?

當我們插入數據並關閉蒙戈服務器我們注意到,這個事情得到緩衝,然後我們把蒙戈服務器備份,本地驅動程序成功地重新連接和節點恢復插入數據,但緩存文件(蒙戈beeing離線時)不要再插入。

所以我質疑這個緩衝區及其用法。

目標:

我們正在尋找保持緩衝區插入的最佳方式,直到蒙戈回來(在根據wtimeout 15000milliseconds),並讓然後將緩衝的文件或利用xml.pause();xml.resume()這我們嘗試沒有成功。

基本上,我們需要一點幫助,以便如何處理斷開連接而沒有數據丟失或中斷。

+0

不能複製此,無論是在使用'XML的stream'插入一旦週一緩衝的對象的文檔和測試的示例去服務器備份..也許你可以發佈更多的代碼/提供有關您的設置的更多信息? – cviejo

+0

@cviejo我不能分享我的腳本,因爲它們是公司相關的,但是您會介意將您嘗試複製的腳本發給我嗎? Gist/pastebin就可以了。 – DanFromGermany

回答

1

我不知道Mongodb驅動程序和這個緩衝區條目。也許它只保留特定場景中的數據。

所以我會用更一般的方法來回答這個問題,它可以用於任何數據庫。

總之,你有兩個問題:

  1. 您還沒有從失敗的嘗試恢復
  2. XML流發送數據太快

要處理的第一個問題,你需要實現一種重試算法,可確保在放棄之前進行多次嘗試。

要處理第二個問題,您需要在xml流上實現背壓。您可以使用pause方法,resume方法和輸入緩衝區來完成此操作。

var Promise = require('bluebird'); 
var fs = require('fs'); 
var Xml = require('xml-stream'); 

var fileStream = fs.createReadStream('myFile.xml'); 
var xml = new Xml(fileStream); 

// simple exponential retry algorithm based on promises 
function exponentialRetry(task, initialDelay, maxDelay, maxRetry) { 
    var delay = initialDelay; 
    var retry = 0; 
    var closure = function() { 
     return task().catch(function(error) { 
      retry++; 
      if (retry > maxRetry) { 
       throw error 
      } 
      var promise = Promise.delay(delay).then(closure); 
      delay = Math.min(delay * 2, maxDelay); 
      return promise; 
     }) 
    }; 
    return closure(); 
} 

var maxPressure = 100; 
var currentPressure = 0; 
var suspended = false; 
var stopped = false; 
var buffer = []; 

// handle back pressure by storing incoming tasks in the buffer 
// pause the xml stream as soon as we have enough tasks to work on 
// resume it when the buffer is empty 
function writeXmlDataWithBackPressure(product) { 
    // closure used to try to start a task 
    var tryStartTask = function() { 
     // if we have enough tasks running, pause the xml stream 
     if (!stopped && !suspended && currentPressure >= maxPressure) { 
      xml.pause(); 
      suspended = true; 
      console.log("stream paused"); 
     } 
     // if we have room to run tasks 
     if (currentPressure < maxPressure) { 
      // if we have a buffered task, start it 
      // if not, resume the xml stream 
      if (buffer.length > 0) { 
       buffer.shift()(); 
      } else if (!stopped) { 
       try { 
        xml.resume(); 
        suspended = false; 
        console.log("stream resumed"); 
       } catch (e) { 
        // the only way to know if you've reached the end of the stream 
        // xml.on('end') can be triggered BEFORE all handlers are called 
        // probably a bug of xml-stream 
        stopped = true; 
        console.log("stream end"); 
       } 
      } 
     } 
    }; 

    // push the task to the buffer 
    buffer.push(function() { 
     currentPressure++; 
     // use exponential retry to ensure we will try this operation 100 times before giving up 
     exponentialRetry(function() { 
      return writeDataToDb(product) 
     }, 100, 2000, 100).finally(function() { 
      currentPressure--; 
      // a task has just finished, let's try to run a new one 
      tryStartTask(); 
     }); 
    }); 

    // we've just buffered a task, let's try to run it 
    tryStartTask(); 
} 

// write the product to database here :) 
function writeDataToDb(product) { 
    // the following code is here to create random delays and random failures (just for testing) 
    var timeToWrite = Math.random() * 100; 
    var failure = Math.random() > 0.5; 
    return Promise.delay(timeToWrite).then(function() { 
     if (failure) { 
      throw new Error(); 
     } 
     return null; 
    }) 
} 

xml.on('endElement: product', writeXmlDataWithBackPressure); 

玩它,把一些console.log瞭解它的行爲。 我希望這會幫助你解決你的問題:)

+0

這基本上是一個很好的實現,但我希望能夠利用mongo的內部寫入關注/寫入緩衝區 - 請查看[本頁](https://mongodb.github.io/node-mongodb- native/drivers-articles/anintroductionto1_4_and_2_6.html)和關鍵字'bufferMaxEntries'。 – DanFromGermany

2

插入500K元素與insertOne()是一個非常糟糕的主意。您應該使用bulk operations,它允許您在單個請求中插入許多文檔。 (這裏例如10000,因此它可以在50個單請求完成) 爲了避免緩衝問題,您可以手動處理:

  1. 禁用緩衝帶bufferMaxEntries: 0
  2. 設置重新連接性能:reconnectTries: 30, reconnectInterval: 1000
  3. 創建批量操作並用10000個項目填充
  4. 暫停xml閱讀器。嘗試插入10000個項目。如果失敗,重試每3000ms,直到它成功
  5. 你可能會面臨一些重複的ID問題,如果批量操作的執行過程中中斷,所以不理會他們(錯誤代碼:11000)

這裏是一個示例腳本:

var fs = require('fs') 
var Xml = require('xml-stream') 

var MongoClient = require('mongodb').MongoClient 
var url = 'mongodb://localhost:27017/test' 

MongoClient.connect(url, { 
    reconnectTries: 30, 
    reconnectInterval: 1000, 
    bufferMaxEntries: 0 
}, function (err, db) { 
    if (err != null) { 
    console.log('connect error: ' + err) 
    } else { 
    var collection = db.collection('product') 
    var bulk = collection.initializeUnorderedBulkOp() 
    var totalSize = 500001 
    var size = 0 

    var fileStream = fs.createReadStream('data.xml') 
    var xml = new Xml(fileStream) 
    xml.on('endElement: product', function (product) { 
     bulk.insert(product) 
     size++ 
     // if we have enough product, save them using bulk insert 
     if (size % 10000 == 0) { 
     xml.pause() 
     bulk.execute(function (err, result) { 
      if (err == null) { 
      bulk = collection.initializeUnorderedBulkOp() 
      console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try') 
      xml.resume() 
      } else { 
      console.log('bulk insert failed: ' + err) 
      counter = 0 
      var retryInsert = setInterval(function() { 
       counter++ 
       bulk.execute(function (err, result) { 
       if (err == null) { 
        clearInterval(retryInsert) 
        bulk = collection.initializeUnorderedBulkOp() 
        console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries') 
        xml.resume() 
       } else if (err.code === 11000) { // ignore duplicate ID error 
        clearInterval(retryInsert) 
        bulk = collection.initializeUnorderedBulkOp() 
        console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries') 
        xml.resume() 
       } else { 
        console.log('failed after first try: ' + counter, 'error: ' + err) 
       } 
       }) 
      }, 3000) // retry every 3000ms until success 
      } 
     }) 
     } else if (size === totalSize) { 
     bulk.execute(function (err, result) { 
      if (err == null) { 
      db.close() 
      } else { 
      console.log('bulk insert failed: ' + err) 
      } 
     }) 
     } 
    }) 
    } 
}) 

示例日誌輸出:

doc 0 : 10000 saved on first try 
doc 10000 : 20000 saved on first try 
doc 20000 : 30000 saved on first try 
[...] 
bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown 
failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0 
failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0 
failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0 
doc 130000 : 140000 saved after 4 tries 
doc 140000 : 150000 saved on first try 
[...] 
+0

您的回答沒有提供有關mongo寫入緩衝區的信息,也沒有解決如何在複製副本集或斷開連接期間插入所有文檔的解決方案。有關批量插入的信息很有趣,我會仔細研究一下,謝謝! – DanFromGermany

+0

@DanFromGermany是的,因爲對我來說,它看起來像你試圖解決錯誤的問題:真正的問題是你的應用程序與數據庫斷開連接。減少對數據庫的調用時,自動重新連接會更容易,因此不需要進行寫入緩衝 – felix

+0

我的應用程序**不會從數據庫中斷開連接。我希望編寫應用程序**,以便在**斷開連接時*或*副本集中的主交換機斷言重新連接並寫入所有數據。 – DanFromGermany

相關問題