Neo4j NodeJS并行执行
问题描述:
我想在NodeJS上使用并行执行来运行Neo4J。 运行多个时,我在ECONNETION RESET上发生套接字错误或EM错误。Neo4j NodeJS并行执行
“的getaddrinfo EMFILE neo4j.dev.com:7687” 或:
误差在neo4jSession.run()错误:连接由服务器
和其他封闭。
是否有任何常见的正确方法排队大量的并行执行,同时不会超载会话?
这是我的代码 - 哪个不起作用。
var neo4j = require('neo4j-driver').v1;
var Neo4jConfigurator = require("./config/Neo4jConfigurator.js").Neo4jConfigurator;
var Promise = require("bluebird");
var url = Neo4jConfigurator.instance().getDBURlIndexer();
var userName = Neo4jConfigurator.instance().getUserName();
var password = Neo4jConfigurator.instance().getPassword();
var requestPromise = require("request-promise");
var Neo4jSession = function()
{
this.masterDriver = null;
this.defaultDriver = this.setDefaultDriver();
this.activeSessions = {};
}
Neo4jSession.prototype.getActiveSessions = function(){
return this.activeSessions;
}
Neo4jSession.prototype.getMasterDriver = function(){
return this.masterDriver;
}
Neo4jSession.prototype.setMasterDriver = function(endPoint){
var boltPort = Neo4jConfigurator.instance().getBoltPort();
this.masterDriver = neo4j.driver("bolt://" + endPoint + ":" + boltPort , neo4j.auth.basic(userName, password),{encrypted:false, connectionPoolSize : 10});
}
Neo4jSession.prototype.setDefaultDriver = function(){
var url = Neo4jConfigurator.instance().getDBURlIndexer();
var driver = neo4j.driver("bolt://" + url, neo4j.auth.basic(userName, password), {encrypted:false, connectionPoolSize : 10});
return driver;
}
Neo4jSession.prototype.getDefaultDriver = function(){
return this.defaultDriver;
}
Neo4jSession.prototype.addDefaultSession = function(){
var driver = this.getDefaultDriver();
var session = driver.session();
return session;
}
Neo4jSession.prototype.run = function(cypherQuery){
var session = this.addDefaultSession();
this.addActiveSession(session);
return session.run(cypherQuery);
}
Neo4jSession.prototype.runMasterSession = function(cypherQuery){
var driver = this.getMasterDriver();
var masterSession = driver.session();
this.addActiveSession(masterSession);
return masterSession.run(cypherQuery);
}
Neo4jSession.prototype.closeAllSessions = function(){
var neo4jSessionObj = this;
Object.keys(this.getActiveSessions()).forEach(function(activeSessionId){
neo4jSessionObj.getActiveSessions()[activeSessionId].close();
delete neo4jSessionObj.getActiveSessions()[activeSessionId];
//console.log('deleted session ' + activeSessionId);
})
}
Neo4jSession.prototype.getActiveSessions = function(){
return this.activeSessions;
}
Neo4jSession.prototype.addActiveSession = function(session){
var neo4jSessionObj = this;
var activeSessionsNumber = Object.keys(this.getActiveSessions()).length + 1;
this.getActiveSessions()[activeSessionsNumber] = session;
//console.log('added session ' + activeSessionsNumber);
}
Neo4jSession.prototype.initMaster = function()
{
var neo4jSessionObj = this;
return new Promise(function(resolve, reject){
Promise.each(Neo4jConfigurator.instance().getEndPoints(), function(endPoint){
var masterRoute = Neo4jConfigurator.instance().getMasterRoute();
var httpPort = Neo4jConfigurator.instance().getHttpPort();
var boltPort = Neo4jConfigurator.instance().getBoltPort();
options = { uri : 'http://' + endPoint + ":" + httpPort + masterRoute ,
method: "GET",
headers:
{
'Accept-Charset': 'utf-8'
}
}
requestPromise(options)
.then(function(body){
try
{
body = body.replace(/\0/g,' ');
json = JSON.parse(body);
if(json == true)
{
neo4jSessionObj.setMasterDriver(endPoint);
resolve(endPoint);
}
}
catch(err)
{
reject(err)
}
})
.catch(function(err){
if(! err.error || err.error != 'false')
reject(err);
})
})
})
}
module.exports = {
Neo4jSession: Neo4jSession
}
答
这是一个很大的(重复)的代码,究竟是什么不起作用?
您不需要保留会话,您可以在查询运行后丢弃它们。
同样在会话/事务中,您可以使用async.serial逐个运行多个查询。 或者,如果你想批量查询/更新你可以用
UNWIND {data} as row MATCH ...
预先考虑您的查询,通过一个{data:[ {foo:bar}, {foo:bar2}, ...]}
参数与如。 10k个对象。
你能分享一个两个解决方案的例子吗? –