Neo4j NodeJS并行执行

问题描述:

我想在NodeJS上使用并行执行来运行Neo4J。 运行多个时,我在ECONNETION RESET上发生套接字错误或EM错误。Neo4j NodeJS并行执行

“的getaddrinfo EMFILE neo4j.dev.com:7687” 或:

误差在neo4jSession.run()错误:连接由服务器

和其他封闭。

是否有任何常见的正确方法排队大量的并行执行,同时不会超载会话?

这是我的代码 - 哪个不起作用。

var neo4j = require('neo4j-driver').v1; 
 
var Neo4jConfigurator = require("./config/Neo4jConfigurator.js").Neo4jConfigurator; 
 
var Promise = require("bluebird"); 
 

 
var url = Neo4jConfigurator.instance().getDBURlIndexer(); 
 
var userName = Neo4jConfigurator.instance().getUserName(); 
 
var password = Neo4jConfigurator.instance().getPassword(); 
 
var requestPromise = require("request-promise"); 
 

 

 
var Neo4jSession = function() 
 
{ 
 
    this.masterDriver = null; 
 
    this.defaultDriver = this.setDefaultDriver(); 
 
    this.activeSessions = {}; 
 
} 
 

 
Neo4jSession.prototype.getActiveSessions = function(){ 
 
    return this.activeSessions; 
 
} 
 

 

 
Neo4jSession.prototype.getMasterDriver = function(){ 
 
    return this.masterDriver; 
 
} 
 

 
Neo4jSession.prototype.setMasterDriver = function(endPoint){ 
 
    var boltPort = Neo4jConfigurator.instance().getBoltPort(); 
 
    this.masterDriver = neo4j.driver("bolt://" + endPoint + ":" + boltPort , neo4j.auth.basic(userName, password),{encrypted:false, connectionPoolSize : 10}); 
 
} 
 

 
Neo4jSession.prototype.setDefaultDriver = function(){ 
 
    var url = Neo4jConfigurator.instance().getDBURlIndexer(); 
 
    var driver = neo4j.driver("bolt://" + url, neo4j.auth.basic(userName, password), {encrypted:false, connectionPoolSize : 10}); 
 
    return driver; 
 
} 
 

 
Neo4jSession.prototype.getDefaultDriver = function(){ 
 
    return this.defaultDriver; 
 
} 
 

 
Neo4jSession.prototype.addDefaultSession = function(){ 
 
    var driver = this.getDefaultDriver(); 
 
    var session = driver.session(); 
 
    return session; 
 
} 
 

 
Neo4jSession.prototype.run = function(cypherQuery){ 
 
    var session = this.addDefaultSession(); 
 
    this.addActiveSession(session); 
 
    return session.run(cypherQuery); 
 
} 
 

 
Neo4jSession.prototype.runMasterSession = function(cypherQuery){ 
 
    var driver = this.getMasterDriver(); 
 
    var masterSession = driver.session(); 
 
    this.addActiveSession(masterSession); 
 
    return masterSession.run(cypherQuery); 
 
} 
 

 
Neo4jSession.prototype.closeAllSessions = function(){ 
 
    var neo4jSessionObj = this; 
 
    Object.keys(this.getActiveSessions()).forEach(function(activeSessionId){ 
 
    neo4jSessionObj.getActiveSessions()[activeSessionId].close(); 
 
    delete neo4jSessionObj.getActiveSessions()[activeSessionId]; 
 
    //console.log('deleted session ' + activeSessionId); 
 
    }) 
 
} 
 

 
Neo4jSession.prototype.getActiveSessions = function(){ 
 
    return this.activeSessions; 
 
} 
 

 
Neo4jSession.prototype.addActiveSession = function(session){ 
 
    var neo4jSessionObj = this; 
 
    var activeSessionsNumber = Object.keys(this.getActiveSessions()).length + 1; 
 
    this.getActiveSessions()[activeSessionsNumber] = session; 
 
    //console.log('added session ' + activeSessionsNumber); 
 
} 
 

 
Neo4jSession.prototype.initMaster = function() 
 
{ 
 
    var neo4jSessionObj = this; 
 
    return new Promise(function(resolve, reject){ 
 
     Promise.each(Neo4jConfigurator.instance().getEndPoints(), function(endPoint){ 
 
      var masterRoute = Neo4jConfigurator.instance().getMasterRoute(); 
 
      var httpPort = Neo4jConfigurator.instance().getHttpPort(); 
 
      var boltPort = Neo4jConfigurator.instance().getBoltPort(); 
 
      options = { uri : 'http://' + endPoint + ":" + httpPort + masterRoute ,          
 
          method: "GET", 
 
          headers: 
 
          { 
 
           'Accept-Charset': 'utf-8' 
 
          } 
 
         } 
 
      requestPromise(options) 
 
      .then(function(body){ 
 
       try 
 
       { 
 
        body = body.replace(/\0/g,' '); 
 
        json = JSON.parse(body); 
 
        if(json == true) 
 
        {    
 
         neo4jSessionObj.setMasterDriver(endPoint);            
 
         resolve(endPoint);    
 
        } 
 
       } 
 
       catch(err) 
 
       { 
 
       reject(err) 
 
       } 
 
      }) 
 
      .catch(function(err){ 
 
      if(! err.error || err.error != 'false') 
 
       reject(err); 
 
      }) 
 
     }) 
 
    }) 
 
} 
 

 

 

 
module.exports = { 
 
    Neo4jSession: Neo4jSession 
 
}

这是一个很大的(重复)的代码,究竟是什么不起作用?

您不需要保留会话,您可以在查询运行后丢弃它们。

同样在会话/事务中,您可以使用async.serial逐个运行多个查询。 或者,如果你想批量查询/更新你可以用

UNWIND {data} as row MATCH ...

预先考虑您的查询,通过一个{data:[ {foo:bar}, {foo:bar2}, ...]}参数与如。 10k个对象。

+0

你能分享一个两个解决方案的例子吗? –