ElasticSearch-logstash数据同步
1.4 安装logstash
1.4.1 安装logstash-input-jdbc插件
(1)拷贝安装包到/home/tdx200/es/目录下
(2)直接解压包logstash-5.2.0.tar.gz
[[email protected] es]# tar -zvf logstash-5.2.0.tar.gz
(3)切换目录
[[email protected] es]# cd logstash-5.2.0/bin/
(4).安装jdbc
[[email protected] bin]# logstash-plugin install logstash-input-jdbc
(5).查看可用插件列表命令用:
[[email protected] bin]# logstash-plugin list
1.4.2 mysql存在的数据库及表
1.4.3 配置logstash
新建两个文件:
(1).在logstash/bin/目录下新建文件elkdb_data_import.conf
----------------------------start---------------------------------
input {
stdin {
}
jdbc {
# 数据库地址端口和数据库名称
jdbc_connection_string => "jdbc:mysql://192.168.2.66:3306/esdb"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "123456"
# mysql驱动
jdbc_driver_library => "/home/tdx200/es/mysql-connector-java-5.1.40-bin.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
#处理中文乱码问题
codec => plain { charset => "UTF-8"}
#使用其它字段追踪,而不是用时间
use_column_value => true
#追踪的字段
tracking_column => id
record_last_run => true
#上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "/home/tdx200/es/logstash-5.2.0/bin/station_parameter.txt"
#开启分页查询
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
statement_filepath => "/home/tdx200/es/logstash-5.2.0/bin/elkdb_data_import.sql"
#定时各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新,不支持60s以内的秒级更新。
schedule => "* * * * *"
#设定ES索引类型
type => "user"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
if[type]=="user"{
elasticsearch {
#ESIP地址与端口
hosts => ["127.0.0.1:9200"]
#ES索引名称(自己定义的)
index => "elkdb"
#自增ID编号
document_id => "%{rec_id}"
document_type => "user"
workers => 1
flush_size => 20000
idle_flush_time => 10
template_overwrite => true
}
stdout {
#以JSON格式输出
codec => json_lines
}
}
}
--------------------------------------------------------------------------------------------------------------------------
//是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
/
//是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
use_column_value => true
//如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的.比如:ID.
t
tracking_column => MY_ID
/
//指定文件,来记录上次执行到的 tracking_column 字段的值
/
//比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.
/
//我们只需要在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是该文件中的值(10000).
l
last_run_metadata_path => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\station_parameter.txt"
//是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
c
clean_run => false
/
//是否将 column 名称转小写
l
lowercase_column_names => false
/
//存放需要执行的 SQL 语句的文件位置
s
statement_filepath => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\jdbc.sql"
这个文件里记录上次执行到的tracking_column 字段的值,比如上次数据库有10000 条记录,查询完后该文件中就会有数字10000 这样的记录,
下次执行SQL 查询可以从10001 条处开始,我们只需要在SQL 语句中WHERE MY_ID > :last_sql_value 即可. 其中:last_sql_value 取得就是该文件中的值。
--------------------------------------------------------------------------------------------------------------------------
----------------------------end---------------------------------
(2).在logstash/bin/目录下新建文件elkdb_data_import.sql
----------------------------start---------------------------------
select * from `411100001`
----------------------------end---------------------------------
1.4.4 启动同步
(1).开启同步
[[email protected] bin]# bin/logstash -f jdbc.conf
(2)同步后的数据效果图:
1.4.5 增量实时同步
logstash-input-jdbc插件可以实现insert,update 的同步更新,但是不能实现delete操作,解决方案是,
新增一个flag 列,标识记录是否已经被删除,这样,相同的记录也会存在于Elasticsearch。可以执行简单的term查询操作,检索出已经删除的数据信息。
其次,若需要执行cleanup清理数据操作(物理删除),只需要在数据库和ES中同时删除掉标记位deleted的记录即可。
如:
mysql执行:delete from cc where cc.flag=’deleted’;
ES同样执行对应删除操作。