flume监听oracle实现表增量

现有一个需求,需要实时监控oracle用户的操作

现在git上有个版本可以利用hibernate进行对数据库的操作,git地址:https://github.com/keedio/flume-ng-sql-source

前期准备:(1)需要的jar  oracle的 odbc5.jar(oracle安装目录  /jdbc/lib下查找)

                  (2)flume的开源包flume-ng-sql-source-1.4.3.jar ,通过对git项目的编译可以获得,或者csdn上直接下载一个,这两个包都放到flume的lib下

                   (3)flume配置文件

a1.sources = sourceOne
a1.sinks = sinkOne
a1.channels = channelOne

a1.sources.sourceOne.type = org.keedio.flume.source.SQLSource
a1.sources.sourceOne.hibernate.connection.url = jdbc:oracle:thin:@10.150.**.**:1521/orcl
a1.sources.sourceOne.hibernate.connection.user = wangxiaoming
a1.sources.sourceOne.hibernate.connection.password = ***
a1.sources.sourceOne.hibernate.connection.autocommit = true
a1.sources.sourceOne.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
a1.sources.sourceOne.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
a1.sources.sourceOne.run.query.delay = 10000
a1.sources.sourceOne.status.file.path = /tmp
a1.sources.sourceOne.status.file.name = sqlSource.status

a1.sources.sourceOne.start.from = 20190312000000000000
a1.sources.sourceOne.custom.query = select SQL_TEXT,LAST_ACTIVE_TIME from v$sqlarea a where a.PARSING_SCHEMA_NAME in ('WANGXIAOMING') order by a.LAST_ACTIVE_TIME desc
a1.sources.sourceOne.batch.size = 1000
a1.sources.sourceOne.max.rows = 1000
a1.sources.sourceOne.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.sourceOne.hibernate.c3p0.min_size = 1
a1.sources.sourceOne.hibernate.c3p0.max_size = 10

a1.sinks.sinkOne.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sinkOne.brokerList = 10.150.**.**:9092
a1.sinks.sinkOne.topic = realtime-monitor
a1.sinks.sinkOne.batchSize = 5
a1.sinks.sinkOne.requiredAcks =1





a1.channels.channelOne.type = memory
a1.channels.channelOne.capacity = 10000
a1.channels.channelOne.transactionCapacity = 10000
a1.channels.channelOne.byteCapacityBufferPercentage = 20
a1.channels.channelOne.byteCapacity = 800000

a1.sources.sourceOne.channels = channelOne
a1.sinks.sinkOne.channel = channelOne

启动flume:flume-ng agent --name a1  --conf $FLUME_HOME/conf  --conf-file $FLUME_HOME/conf/sql.conf -Dflume.root.logger=INFO,console

启动kafka:kafka-console-consumer.sh --zookeeper hadoop03.010150010152.yz:2181 --topic realtime-monitor

最终kafka消费到用户记录(要根据时间戳实现增量更新的话,可以在sql逻辑上做修改,sql获取当前时间段再和表last时间做关联

flume监听oracle实现表增量