2017-07-02 63 views
1

我想在NodeJS上使用並行執行來運行Neo4J。 運行多個時,我在ECONNETION RESET上發生套接字錯誤或EM錯誤。Neo4j NodeJS並行執行

「的getaddrinfo EMFILE neo4j.dev.com:7687」 或:

誤差在neo4jSession.run()錯誤:連接由服務器

和其他封閉。

是否有任何常見的正確方法排隊大量的並行執行,同時不會超載會話?

這是我的代碼 - 哪個不起作用。

var neo4j = require('neo4j-driver').v1; 
 
var Neo4jConfigurator = require("./config/Neo4jConfigurator.js").Neo4jConfigurator; 
 
var Promise = require("bluebird"); 
 

 
var url = Neo4jConfigurator.instance().getDBURlIndexer(); 
 
var userName = Neo4jConfigurator.instance().getUserName(); 
 
var password = Neo4jConfigurator.instance().getPassword(); 
 
var requestPromise = require("request-promise"); 
 

 

 
var Neo4jSession = function() 
 
{ 
 
    this.masterDriver = null; 
 
    this.defaultDriver = this.setDefaultDriver(); 
 
    this.activeSessions = {}; 
 
} 
 

 
Neo4jSession.prototype.getActiveSessions = function(){ 
 
    return this.activeSessions; 
 
} 
 

 

 
Neo4jSession.prototype.getMasterDriver = function(){ 
 
    return this.masterDriver; 
 
} 
 

 
Neo4jSession.prototype.setMasterDriver = function(endPoint){ 
 
    var boltPort = Neo4jConfigurator.instance().getBoltPort(); 
 
    this.masterDriver = neo4j.driver("bolt://" + endPoint + ":" + boltPort , neo4j.auth.basic(userName, password),{encrypted:false, connectionPoolSize : 10}); 
 
} 
 

 
Neo4jSession.prototype.setDefaultDriver = function(){ 
 
    var url = Neo4jConfigurator.instance().getDBURlIndexer(); 
 
    var driver = neo4j.driver("bolt://" + url, neo4j.auth.basic(userName, password), {encrypted:false, connectionPoolSize : 10}); 
 
    return driver; 
 
} 
 

 
Neo4jSession.prototype.getDefaultDriver = function(){ 
 
    return this.defaultDriver; 
 
} 
 

 
Neo4jSession.prototype.addDefaultSession = function(){ 
 
    var driver = this.getDefaultDriver(); 
 
    var session = driver.session(); 
 
    return session; 
 
} 
 

 
Neo4jSession.prototype.run = function(cypherQuery){ 
 
    var session = this.addDefaultSession(); 
 
    this.addActiveSession(session); 
 
    return session.run(cypherQuery); 
 
} 
 

 
Neo4jSession.prototype.runMasterSession = function(cypherQuery){ 
 
    var driver = this.getMasterDriver(); 
 
    var masterSession = driver.session(); 
 
    this.addActiveSession(masterSession); 
 
    return masterSession.run(cypherQuery); 
 
} 
 

 
Neo4jSession.prototype.closeAllSessions = function(){ 
 
    var neo4jSessionObj = this; 
 
    Object.keys(this.getActiveSessions()).forEach(function(activeSessionId){ 
 
    neo4jSessionObj.getActiveSessions()[activeSessionId].close(); 
 
    delete neo4jSessionObj.getActiveSessions()[activeSessionId]; 
 
    //console.log('deleted session ' + activeSessionId); 
 
    }) 
 
} 
 

 
Neo4jSession.prototype.getActiveSessions = function(){ 
 
    return this.activeSessions; 
 
} 
 

 
Neo4jSession.prototype.addActiveSession = function(session){ 
 
    var neo4jSessionObj = this; 
 
    var activeSessionsNumber = Object.keys(this.getActiveSessions()).length + 1; 
 
    this.getActiveSessions()[activeSessionsNumber] = session; 
 
    //console.log('added session ' + activeSessionsNumber); 
 
} 
 

 
Neo4jSession.prototype.initMaster = function() 
 
{ 
 
    var neo4jSessionObj = this; 
 
    return new Promise(function(resolve, reject){ 
 
     Promise.each(Neo4jConfigurator.instance().getEndPoints(), function(endPoint){ 
 
      var masterRoute = Neo4jConfigurator.instance().getMasterRoute(); 
 
      var httpPort = Neo4jConfigurator.instance().getHttpPort(); 
 
      var boltPort = Neo4jConfigurator.instance().getBoltPort(); 
 
      options = { uri : 'http://' + endPoint + ":" + httpPort + masterRoute ,          
 
          method: "GET", 
 
          headers: 
 
          { 
 
           'Accept-Charset': 'utf-8' 
 
          } 
 
         } 
 
      requestPromise(options) 
 
      .then(function(body){ 
 
       try 
 
       { 
 
        body = body.replace(/\0/g,' '); 
 
        json = JSON.parse(body); 
 
        if(json == true) 
 
        {    
 
         neo4jSessionObj.setMasterDriver(endPoint);            
 
         resolve(endPoint);    
 
        } 
 
       } 
 
       catch(err) 
 
       { 
 
       reject(err) 
 
       } 
 
      }) 
 
      .catch(function(err){ 
 
      if(! err.error || err.error != 'false') 
 
       reject(err); 
 
      }) 
 
     }) 
 
    }) 
 
} 
 

 

 

 
module.exports = { 
 
    Neo4jSession: Neo4jSession 
 
}

回答

0

這是一個很大的(重複)的代碼,究竟是什麼不起作用?

您不需要保留會話,您可以在查詢運行後丟棄它們。

同樣在會話/事務中,您可以使用async.serial逐個運行多個查詢。 或者,如果你想批量查詢/更新你可以用

UNWIND {data} as row MATCH ...

預先考慮您的查詢,通過一個{data:[ {foo:bar}, {foo:bar2}, ...]}參數與如。 10k個對象。

+0

你能分享一個兩個解決方案的例子嗎? –