logstash把mysql数据导入elasticsearch中并设置分词器

1.ElasticSearch安装:

ElasticSearch 的下载地址:https://www.elastic.co/downloads/elasticsearch;

下载好之后将其解压到你想要安装的目录:比如我的 D:\chengxu\ElasticSearch\elasticsearch-6.3.0

以上,就算安装好了,运行一下。

进入到D:\chengxu\ElasticSearch\elasticsearch-6.3.0 \bin中,双击执行 elasticsearch.bat 。等待打印信息输出完之后打开浏览器,输入:localhost:9200 。安装成功后页面显示如下。

logstash把mysql数据导入elasticsearch中并设置分词器

 

2. ElasticSearch-head的安装

这里我通过一个可视化的工具来查看ES的运行状态和数据,es-head 。

ElasticSearch-head 依赖于node.js

安装node.js

node.js下载地址。

http://nodejs.cn/download/;

下载后,直接打开后除了安装路径自己按需设置外,其他的一路next,直到最后它自动安装完。最后打开cmd 。输入:node --version 看能否打印出版本信息来检验安装是否正确就好了。

node.js安装好后,切换目录到node.js的安装目录中。比如我的是D:\chengxu\tools\node-js

npm install -g grunt-cli

head 依赖的都安装完之后,安装head 。

head的下载地址:

https://github.com/mobz/elasticsearch-head

进入github后选择下载zip,下载完之后将其解压到你想要安装的目录,E:\ruanjiangongju\windowsban-es\chajian\

 

在E:\ruanjiangongju\windowsban-es\chajian\elasticsearch-head-master 中打开cmd,执行

npm install ,将该目录下的相关文件解压并安装,之后启动执行:npm run start

打开浏览器输入:http://localhost:9100/

logstash把mysql数据导入elasticsearch中并设置分词器

发现连接状态还是未连接,因为es默认是不允许跨域连接需要开启。进入到es的安装目录下,

我这里是E:\ruanjiangongju\windowsban-es\elasticsearch-6.7.0 ,进入config 文件夹,打开elasticsearch.yml 在末尾添加

http.cors.enabled: true

http.cors.allow-origin: "*"

保存,然后重启es ,再刷新一下 elasticsearch-head

http.cors.enabled 开启跨域访问支持,默认为false

http.cors.allow-origin 跨域访问允许的域名地址,支持用正则,*表示全部

3. mysql数据增量同步到es

mysql数据同步到es有多种方法,这里我使用logstash做增量同步;

下载logstash: https://www.elastic.co/fr/downloads/logstash

解压到自定义目录,我的是:E:\ruanjiangongju\logstash\logstash-6.7.0

为便于测试先在mysql中创建一张用于测试的表;

logstash把mysql数据导入elasticsearch中并设置分词器

然后再logstash的config目录下创建一个test.conf文件

logstash把mysql数据导入elasticsearch中并设置分词器

 

test.conf内容有:

input {
  stdin {
  }
  jdbc {
	  type => "testimport"
	  jdbc_connection_string => "jdbc:mysql://192.168.147.128:3306/leader_slave_test"
	  jdbc_user => "root"
	  jdbc_password => "****自己的密码"
          #开启记录最后一次运行的结果
	  record_last_run => true
          #设置可以字段追踪
	  use_column_value => true
          #指定数据库中用于追踪的字段
	  tracking_column => "id"
          #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
	  tracking_column_type => "numeric"
          #自定义的文件路径,用于存放追踪字段的最后导入的值
	  last_run_metadata_path => "/ruanjiangongju/logstash/logstash-6.7.0/data/jdbc-position.txt"
	  clean_run => "false"
          #前提是在这个目录下放入mysql-connector-java-5.1.32.jar
	  jdbc_driver_library => "/ruanjiangongju/logstash/logstash-6.7.0/mysql/mysql-connector-java-5.1.32.jar"
	  jdbc_driver_class => "com.mysql.jdbc.Driver"
	  jdbc_paging_enabled => true
	  jdbc_page_size => "500"
	  statement => "select * from test where id > :sql_last_value"
	  schedule => "* * * * *"
  }
}
filter {
	json {
		source => "message"
		remove_field => ["message"]
	}
	date {
		match => ["timestamp","dd/MM/yyyy:HH:mm:ss Z"]
	}
}
output { 
	elasticsearch { 
               #自己的es地址
		hosts => ["http://localhost:9200"] 
		index => "testimport" 
		document_type => "testimport"
	} 
	stdout {
		codec => json_lines 
	} 
}

进入logstash的bin目录打开cmd执行命令:

logstash.bat -f /ruanjiangongju/logstash/logstash-6.7.0/config/test.conf

logstash把mysql数据导入elasticsearch中并设置分词器

logstash把mysql数据导入elasticsearch中并设置分词器

看到此成功;

刷新es-head:

logstash把mysql数据导入elasticsearch中并设置分词器

在test表中新增数据,等一会再次刷新es_head可见可以增量同步成功;但是这个工具有弊端要是想实现修改和删除同步需要代码中实现;

4. 在导入时配置默认使用ik分词器:

ik分词器下载地址:https://github.com/medcl/elasticsearch-analysis-ik/releases

下载完成后解压到es的plugins中:我的是在E:\ruanjiangongju\windowsban-es\elasticsearch-6.7.0\plugins中;

然后在E:\ruanjiangongju\logstash\logstash-6.7.0目录下创建文件夹templete,接着再创建一个json文件如下:

{
    "template": "*",
    "version": 00001,
    "settings": {
        "index.refresh_interval": "5s"
    },
    "mappings": {
        "_default_": {
            "_all": {
                "enabled": true,
                "norms": false
            },
            "dynamic_templates": [
                {
                    "message_field": {
                        "path_match": "message",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "text",
                            "norms": false
                        }
                    }
                },
                {
                    "string_fields": {
                        "match": "*",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "text",
                            "norms": false,
                            "analyzer": "ik_max_word",#只需要添加这一行即可设置分词器为ik_max_word
                            "fields": {
                                "keyword": {
                                    "type": "keyword"
                                }
                            }
                        }
                    }
                }
            ],
            "properties": {
                "@timestamp": {
                    "type": "date",
                    "include_in_all": false
                },
                "@version": {
                    "type": "keyword",
                    "include_in_all": false
                }
            }
        }
    }
}

然后更改test.conf文件如下:

input {
  stdin {
  }
  jdbc {
	  type => "testimport"
	  jdbc_connection_string => "jdbc:mysql://192.168.147.128:3306/leader_slave_test"
	  jdbc_user => "root"
	  jdbc_password => "****自己的密码"
	  #开启记录最后一次运行的结果
	  record_last_run => true
	  #设置可以字段追踪
	  use_column_value => true
	  #指定数据库中用于追踪的字段
	  tracking_column => "id"
	  #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
	  tracking_column_type => "numeric"
	  #自定义的文件路径,用于存放追踪字段的最后导入的值
	  last_run_metadata_path => "/ruanjiangongju/logstash/logstash-6.7.0/data/jdbc-position.txt"
	  clean_run => "false"
	  #前提是在这个目录下放入mysql-connector-java-5.1.32.jar
	  jdbc_driver_library => "/ruanjiangongju/logstash/logstash-6.7.0/mysql/mysql-connector-java-5.1.32.jar"
	  jdbc_driver_class => "com.mysql.jdbc.Driver"
	  jdbc_paging_enabled => true
	  jdbc_page_size => "500"
	  statement => "select * from test where id > :sql_last_value"
	  schedule => "* * * * *"
  }
}
filter {
	json {
		source => "message"
		remove_field => ["message"]
	}
	date {
		match => ["timestamp","dd/MM/yyyy:HH:mm:ss Z"]
	}
}
output { 
	elasticsearch { 
		hosts => ["http://localhost:9200"] 
		index => "testimport" 
		document_type => "testimport"
		#配置模板文件
		template_overwrite => true
                template => "/ruanjiangongju/logstash/logstash-6.7.0/templete/logstash.json"
	} 
	stdout {
		codec => json_lines 
	} 
}

启动logstash后再test表中新增数据:

logstash把mysql数据导入elasticsearch中并设置分词器

然后在es-head中查询ik是否生效:

logstash把mysql数据导入elasticsearch中并设置分词器

成功!!!