logstash来实现全量和增量将mysql数据导入到es
1 简介
本文说明使用logstash来实现全量和增量将mysql数据导入到es中。每一分钟执行一次。有时效性高的要求可以提高频率。
2.logstash同步mysql数据到elasticsearch
logstash-plugin install logstash-output-elasticsearch logstash-plugin install logstash-input-jdbc
将mysql-connector-java-8.0.11.jar copy到logstash/bin/mysql目录下
2.1 时间维度
1.以一定的时间间隔定时更新mysql数据到es。
2.配置如下
全量构建索引
(1)jdbc.conf
input { jdbc { jdbc_default_timezone => "Asia/Shanghai" # mysql 数据库链接,school-edu为数据库名 jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/school-edu?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull" # 用户名和密码 jdbc_user => "xxxxx" jdbc_password => "xxxxx" # 驱动 jdbc_driver_library => "E:\linux\ELK\6.8.2\logstash\bin\mysql\mysql-connector-java-8.0.18.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 执行的sql 文件路径+名称 statement_filepath => "E:\linux\ELK\6.8.2\logstash\config\mysql\jdbc.sql" # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" } } output { elasticsearch { # ES的IP地址及端口 hosts => ["localhost:9200"] # 索引名称 index => "teacher" user => "elastic" password => "changeme" document_type => "_doc" # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号 document_id => "%{id}" } stdout { # JSON格式输出 codec => json_lines } }
(2)jdbc.sql
select id, `name`, intro, career, avatar, sort, upd_time from edu_teacher
增量构建索引
(1)jdbc-update.conf
input { jdbc { jdbc_default_timezone => "Asia/Shanghai" # mysql 数据库链接,school-edu为数据库名 jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/school-edu?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull" # 用户名和密码 jdbc_user => "xxxxxx" jdbc_password => "xxxxx" # 驱动 jdbc_driver_library => "E:\linux\ELK\6.8.2\logstash\bin\mysql\mysql-connector-java-8.0.18.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" last_run_metadata_path => "E:\linux\ELK\6.8.2\logstash\config\mysql\last_value_meta.txt" # 执行的sql 文件路径+名称 statement_filepath => "E:\linux\ELK\6.8.2\logstash\config\mysql\jdbc-update.sql" # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" } } output { elasticsearch { # ES的IP地址及端口 hosts => ["localhost:9200"] # 索引名称 index => "teacher" user => "elastic" password => "changeme" document_type => "_doc" # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号 document_id => "%{id}" } stdout { # JSON格式输出 codec => json_lines } }
(2)jdbc-update.sql
select id, `name`, intro, career, avatar, sort, upd_time from edu_teacher where upd_time >= :sql_last_value
(3)last_value_meta.txt
2020-01-29 00:00:00
全量构建
logstash -f E:\linux\ELK\6.8.2\logstash\config\mysql\jdbc.conf
增量构建
logstash -f E:\linux\ELK\6.8.2\logstash\config\mysql\jdbc-update.conf