logstash把mysql数据导入elasticsearch中并设置分词器
1.ElasticSearch安装:
ElasticSearch 的下载地址:https://www.elastic.co/downloads/elasticsearch;
下载好之后将其解压到你想要安装的目录:比如我的 D:\chengxu\ElasticSearch\elasticsearch-6.3.0
以上,就算安装好了,运行一下。
进入到D:\chengxu\ElasticSearch\elasticsearch-6.3.0 \bin中,双击执行 elasticsearch.bat 。等待打印信息输出完之后打开浏览器,输入:localhost:9200 。安装成功后页面显示如下。
2. ElasticSearch-head的安装:
这里我通过一个可视化的工具来查看ES的运行状态和数据,es-head 。
ElasticSearch-head 依赖于node.js
安装node.js
node.js下载地址。
http://nodejs.cn/download/;
下载后,直接打开后除了安装路径自己按需设置外,其他的一路next,直到最后它自动安装完。最后打开cmd 。输入:node --version 看能否打印出版本信息来检验安装是否正确就好了。
node.js安装好后,切换目录到node.js的安装目录中。比如我的是D:\chengxu\tools\node-js
npm install -g grunt-cli
head 依赖的都安装完之后,安装head 。
head的下载地址:
https://github.com/mobz/elasticsearch-head
进入github后选择下载zip,下载完之后将其解压到你想要安装的目录,E:\ruanjiangongju\windowsban-es\chajian\
在E:\ruanjiangongju\windowsban-es\chajian\elasticsearch-head-master 中打开cmd,执行
npm install ,将该目录下的相关文件解压并安装,之后启动执行:npm run start
打开浏览器输入:http://localhost:9100/
发现连接状态还是未连接,因为es默认是不允许跨域连接需要开启。进入到es的安装目录下,
我这里是E:\ruanjiangongju\windowsban-es\elasticsearch-6.7.0 ,进入config 文件夹,打开elasticsearch.yml 在末尾添加
http.cors.enabled: true
http.cors.allow-origin: "*"
保存,然后重启es ,再刷新一下 elasticsearch-head
http.cors.enabled 开启跨域访问支持,默认为false
http.cors.allow-origin 跨域访问允许的域名地址,支持用正则,*表示全部
3. mysql数据增量同步到es
mysql数据同步到es有多种方法,这里我使用logstash做增量同步;
下载logstash: https://www.elastic.co/fr/downloads/logstash
解压到自定义目录,我的是:E:\ruanjiangongju\logstash\logstash-6.7.0
为便于测试先在mysql中创建一张用于测试的表;
然后再logstash的config目录下创建一个test.conf文件
test.conf内容有:
input {
stdin {
}
jdbc {
type => "testimport"
jdbc_connection_string => "jdbc:mysql://192.168.147.128:3306/leader_slave_test"
jdbc_user => "root"
jdbc_password => "****自己的密码"
#开启记录最后一次运行的结果
record_last_run => true
#设置可以字段追踪
use_column_value => true
#指定数据库中用于追踪的字段
tracking_column => "id"
#追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
tracking_column_type => "numeric"
#自定义的文件路径,用于存放追踪字段的最后导入的值
last_run_metadata_path => "/ruanjiangongju/logstash/logstash-6.7.0/data/jdbc-position.txt"
clean_run => "false"
#前提是在这个目录下放入mysql-connector-java-5.1.32.jar
jdbc_driver_library => "/ruanjiangongju/logstash/logstash-6.7.0/mysql/mysql-connector-java-5.1.32.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => "500"
statement => "select * from test where id > :sql_last_value"
schedule => "* * * * *"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
date {
match => ["timestamp","dd/MM/yyyy:HH:mm:ss Z"]
}
}
output {
elasticsearch {
#自己的es地址
hosts => ["http://localhost:9200"]
index => "testimport"
document_type => "testimport"
}
stdout {
codec => json_lines
}
}
进入logstash的bin目录打开cmd执行命令:
logstash.bat -f /ruanjiangongju/logstash/logstash-6.7.0/config/test.conf
看到此成功;
刷新es-head:
在test表中新增数据,等一会再次刷新es_head可见可以增量同步成功;但是这个工具有弊端要是想实现修改和删除同步需要代码中实现;
4. 在导入时配置默认使用ik分词器:
ik分词器下载地址:https://github.com/medcl/elasticsearch-analysis-ik/releases
下载完成后解压到es的plugins中:我的是在E:\ruanjiangongju\windowsban-es\elasticsearch-6.7.0\plugins中;
然后在E:\ruanjiangongju\logstash\logstash-6.7.0目录下创建文件夹templete,接着再创建一个json文件如下:
{
"template": "*",
"version": 00001,
"settings": {
"index.refresh_interval": "5s"
},
"mappings": {
"_default_": {
"_all": {
"enabled": true,
"norms": false
},
"dynamic_templates": [
{
"message_field": {
"path_match": "message",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"norms": false
}
}
},
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"norms": false,
"analyzer": "ik_max_word",#只需要添加这一行即可设置分词器为ik_max_word
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
],
"properties": {
"@timestamp": {
"type": "date",
"include_in_all": false
},
"@version": {
"type": "keyword",
"include_in_all": false
}
}
}
}
}
然后更改test.conf文件如下:
input {
stdin {
}
jdbc {
type => "testimport"
jdbc_connection_string => "jdbc:mysql://192.168.147.128:3306/leader_slave_test"
jdbc_user => "root"
jdbc_password => "****自己的密码"
#开启记录最后一次运行的结果
record_last_run => true
#设置可以字段追踪
use_column_value => true
#指定数据库中用于追踪的字段
tracking_column => "id"
#追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
tracking_column_type => "numeric"
#自定义的文件路径,用于存放追踪字段的最后导入的值
last_run_metadata_path => "/ruanjiangongju/logstash/logstash-6.7.0/data/jdbc-position.txt"
clean_run => "false"
#前提是在这个目录下放入mysql-connector-java-5.1.32.jar
jdbc_driver_library => "/ruanjiangongju/logstash/logstash-6.7.0/mysql/mysql-connector-java-5.1.32.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => "500"
statement => "select * from test where id > :sql_last_value"
schedule => "* * * * *"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
date {
match => ["timestamp","dd/MM/yyyy:HH:mm:ss Z"]
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "testimport"
document_type => "testimport"
#配置模板文件
template_overwrite => true
template => "/ruanjiangongju/logstash/logstash-6.7.0/templete/logstash.json"
}
stdout {
codec => json_lines
}
}
启动logstash后再test表中新增数据:
然后在es-head中查询ik是否生效:
成功!!!