2016-08-07 112 views
2

我想使用NodeJs將超過100萬行插入到Postgres表中 問題是,當我啓動腳本時,內存不斷增加,直到它達到1.5 GB的RAM並且然後我得到錯誤: 致命錯誤:CALL_AND_RETRY_LAST分配失敗 - 加工出來的內存使用NodeJS將大量行插入到Postgres DB中

,結果都是一樣的 - 大約7000插入的行,而不是100萬

下面是代碼

var pg = require('pg'); 
var fs = require('fs'); 
var config = require('./config.js'); 



var PgClient = new pg.Client(config.pg); 
PgClient.connect(); 

var lineReader = require('readline').createInterface({ 
     input: require('fs').createReadStream('resources/database.csv') //file contains over 1 million lines 
    }); 
var n=0; 




lineReader.on('line', function(line) { 
     n++; 
     var insert={"firstname":"John","lastname":"Conor"}; 

     //No matter what data we insert, the point is that the number of inserted rows much less than it should be 
     PgClient.query('INSERT INTO HUMANS (firstname,lastname) values ($1,$2)', [insert.firstname,insert.lastname]); 

}); 

lineReader.on('close',function() { 
    console.log('end '+n); 
}); 
+0

你試過暫停一旦你得到一個線,並恢復一旦查詢的回調被稱爲?我認爲排隊的查詢太多了,這可能會耗盡您的進程內存。 – mscdex

+0

I aded lineReader.pause();在查詢和lineReader.resume()之前;在查詢後,但看起來像沒有工作。相同的錯誤 – m3n1at

+2

請考慮進行批量插入。每行插入太昂貴。 –

回答

-1

所以我解決了這個問題。有PgClient.queryQueue處理速度比讀取文件少得多。當大文件被讀取時,隊列溢出。 這裏的解決方案,我們應該改變lineReader.on(「線」,CB)部分,每個隊列中有很多的元素,我們暫停lineReader時間

lineReader.on('line', function(line) { 
     n++; 
     var insert={"firstname":"John","lastname":"Conor"}; 
     PgClient.query('INSERT INTO HUMANS (firstname,lastname) values ($1,$2)', [insert.firstname,insert.lastname],function (err,result){ 
      if (err) console.log(err); 
      if (PgClient.queryQueue.length>15000) { 
       lineReader.pause(); 
      } 
      else lineReader.resume(); 
     }); 
}); 
+0

這是一個糟糕的解決方案,而正確的解決方案很簡單 - 從批處理文件中讀取大約1000-10,000個插入行,並將每個這樣的讀取插入批處理中。此外,您需要連接插入 - 請參見[性能提升](https://github.com/vitaly-t/pg-promise/wiki/Performance-Boost)。 –

1

我用PG-承諾爲維塔利-T建議。而這段代碼工作真快

const fs = require('fs'); 
const pgp = require('pg-promise')(); 
const config = require('./config.js'); 

// Db connection 
const db = pgp(config.pg); 

// Transform a lot of inserts into one 
function Inserts(template, data) { 
    if (!(this instanceof Inserts)) { 
     return new Inserts(template, data); 
    } 
    this._rawType = true; 
    this.toPostgres =() => { 
     return data.map(d => '(' + pgp.as.format(template, d) + ')').join(); 
    }; 
} 

// insert Template 
function Insert() { 
     return { 
      firstname: null, 
      lastname: null, 
      birthdate:  null, 
      phone: null, 
      email: null, 
      city: null, 
      district: null, 
      location: null, 
      street: null 
     }; 
}; 
const lineReader = require('readline').createInterface({ 
     input: require('fs').createReadStream('resources/database.csv') 
    }); 


let n = 0; 
const InsertArray = []; 

lineReader.on('line', function(line) { 
     var insert = new Insert(); 
     n ++; 
     var InsertValues=line.split(','); 
     if (InsertValues[0]!=='"Firstname"'){ //skip first line 
      let i = 0; 
      for (let prop in insert){ 
       insert[prop] = (InsertValues[i]=='')?insert[prop]:InsertValues[i]; 
       i++; 
      } 
      InsertArray.push(insert); 
      if (n == 10000){ 
       lineReader.pause(); 
       // convert insert array into one insert 
       const values = new Inserts('${firstname}, ${lastname},${birthdate},${phone},${email},${city},${district},${location},${street}', InsertArray); 
       db.none('INSERT INTO users (firstname, lastname,birthdate,phone,email,city,district,location,street) VALUES $1', values) 
       .then(data => { 
        n = 0; 
        InsertArray=[]; 
        lineReader.resume(); 
       }) 
       .catch(error => { 
        console.log(error); 
       }); 
      } 
     } 
}); 


lineReader.on('close',function() { 
    console.log('end '+n); 
    //last insert 
    if (n > 0) { 
     const values = new Inserts('${firstname}, ${lastname},${birthdate},${phone},${email},${city},${district},${location},${street}', InsertArray); 
     db.none('INSERT INTO users (firstname, lastname,birthdate,phone,email,city,district,location,street) VALUES $1', values) 
      .then(data => { 
       console.log('Last'); 
      }) 
      .catch(error => { 
       console.log(error); 
      }); 
    } 
}); 
+0

最佳示例:[數據導入](https://github.com/vitaly-t/pg-promise/wiki/Data-Imports)。 –

+0

我已經更新了符合最新pg-promise v6.5.0的代碼示例 –