同步logstash JDBC输入
问题描述:
我使用多个logstash JDBC输入:同步logstash JDBC输入
jdbc {
jdbc_driver_library => "../vendor/oracle/ojdbc7.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
connection_retry_attempts => 10
connection_retry_attempts_wait_time => 5
jdbc_validate_connection => "true"
jdbc_connection_string => "connectionString/myDataBase"
jdbc_user => "USER_NAME"
jdbc_password => "PASSWORD"
schedule => "* * * * *"
statement_filepath => "myPath/queryA.sql"
tracking_column => "myTrackingcolumn"
last_run_metadata_path => "myPath/.logstash_jdbc_last_run"
type => "documentType"
add_field => {
"tag" => "myFirstTag"
}
}
jdbc {
jdbc_driver_library => "../vendor/oracle/ojdbc7.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
connection_retry_attempts => 10
connection_retry_attempts_wait_time => 5
jdbc_validate_connection => "true"
jdbc_connection_string => "connectionString/myDataBase"
jdbc_user => "USER_NAME"
jdbc_password => "PASSWORD"
schedule => "* * * * *"
statement_filepath => "myPath/queryB.sql"
tracking_column => "myTrackingcolumn"
last_run_metadata_path => "myPath/.logstash_jdbc_last_run"
type => "documentType"
add_field => {
"tag" => "mySecondTag"
}
}
jdbc {
jdbc_driver_library => "../vendor/oracle/ojdbc7.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
connection_retry_attempts => 10
connection_retry_attempts_wait_time => 5
jdbc_validate_connection => "true"
jdbc_connection_string => "connectionString/myDataBase"
jdbc_user => "USER_NAME"
jdbc_password => "PASSWORD"
schedule => "* * * * *"
statement_filepath => "myPath/queryC.sql"
tracking_column => "myTrackingcolumn"
last_run_metadata_path => "myPath/.logstash_jdbc_last_run"
type => "documentType"
add_field => {
"tag" => "myThirdTag"
}
}
由于现时我询问这引发了以下错误的数据库中定义的SESSIONS_PER_USER limit
:
[31mPipeline aborted due to error {:exception=>#<Sequel::DatabaseConnectionError: Java::JavaSql::SQLException: ORA-02391:
exceeded simultaneous SESSIONS_PER_USER limit>, :backtrace=>["oracle.jdbc.driver.T4CTTIoer.processError(oracle/jdbc/driver/T4CTTIoer.java:450)", "oracle.jdbc.driver.
T4CTTIoer.processError(oracle/jdbc/driver/T4CTTIoer.java:392)", "oracle.jdbc.driver.T4CTTIoer.processError(oracle/jdbc/driver/T4CTTIoer.java:385)",
"oracle.jdbc.driver.T4CTTIfun.processError(oracle/jdbc/driver/T4CTTIfun.java:938)", "oracle.dbc.driver.T4CTTIoauthenticate.processError(oracle/jdbc/driver/T4CTTIoauthenticate.java:480)",
"oracle.jdbc.driver.T4CTTIfun.receive(oracle/jdbc/driver/T4CTTIfun.java:655)", "oracle.jdbc.driver.T4CTTIfun.doRPC(oracle/jdbc/driver/T4CTTIfun.java:249)",
"oracle.jdbc.driver.T4CTTIoauthenticate.doOAUTH(oracle/jdbc/driver/T4CTTIoauthenticate.java:416)", "oracle.jdbc.driver.T4CTTIoauthenticate.doOAUTH(oracle/jdbc/driver/T4CTTIoauthenticate.java:825)",
"oracle.jdbc.driver.T4CConnection.logon(oracle/jdbc/driver/T4CConnection.java:596)", "oracle.jdbc.driver.PhysicalConnection.<init>(oracle/jdbc/driver/PhysicalConnection.java:715)",
"oracle.jdbc.driver.T4CConnection.<init>(oracle/jdbc/driver/T4CConnection.java:385)", "oracle.jdbc.driver.T4CDriverExtension.getConnection(oracle/jdbc/driver/T4CDriverExtension.java:30)",
"oracle.jdbc.driver.OracleDriver.connect(oracle/jdbc/driver/OracleDriver.java:564)", "RUBY.connect(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/adapters/jdbc.rb:222)",
"RUBY.make_new(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool.rb:110)", "RUBY.make_new(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:226)",
"RUBY.available(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:199)", "RUBY._acquire(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:135)",
"RUBY.acquire(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:149)", "RUBY.sync(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:280)",
"org.jruby.ext.thread.Mutex.synchronize(org/jruby/ext/thread/Mutex.java:149)", "RUBY.sync(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:280)",
"RUBY.acquire(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:148)", "RUBY.acquire(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/extensions/connection_validator.rb:98)",
"RUBY.hold(D:myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:106)", "RUBY.synchronize(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/database/connecting.rb:256)",
"RUBY.test_connection(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/database/connecting.rb:266)", "RUBY.prepare_jdbc_connection(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-input-jdbc-3.1.0/lib/logstash/plugin_mixins/jdbc.rb:173)",
"RUBY.register(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-input-jdbc-3.1.0/lib/logstash/inputs/jdbc.rb:187)", "RUBY.start_inputs(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/pipeline.rb:330)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)",
"RUBY.start_inputs(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/pipeline.rb:329)", "RUBY.start_workers(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/pipeline.rb:180)",
"RUBY.run(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/pipeline.rb:136)",
"RUBY.start_pipeline(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/agent.rb:473)"], :level=>:error}?[0mstopping pipeline {:id=>"main"}
如何配置这些输入使logstash按顺序执行SQL查询并避免超出允许的会话限制?
我认为这可以是一个解决方法,但不是一个永久的解决方案。我需要确保所有输入都已执行并避免崩溃logstash实例。 crontab编辑器非常棒! – M3HD1
@Mehdi是的,我知道这只是一个解决方法。一个长期的解决方案是每个输入使用一个用户或增加限制(但在您的情况下可能无法实现)。另一种解决方案(但我不确定它会起作用):将工作人员数量和批量大小设置为1(请参见[这里](https://www.elastic.co/guide/en/logstash/current/command -line-flags.html)),因此一次只能处理一个事件,可以阻止输入([documentation](https://www.elastic.co/guide/en/logstash/5.1/execution- model.html)对此不清楚) – baudsp
我试图用只有一个管道工作者启动logstash实例,但这并不能解决问题。正如文档中提到的,似乎管道工只执行过滤和输出,而不是输入! (这个选项设置将并行执行管道的过滤和输出阶段的工作人员的数量) – M3HD1