ELK 之 logStash导出多个表和多表关联查询结果 到mysql数据到es

MySQL与Elasticsearch 字段类型的对应
https://blog.****.net/u013545439/article/details/102799518

//首先需要创建索引和mapping

logstash :可以更新 添加检测 ,不能删除检测

导出多个表如下:

在/data/software/es/logstash-7.7.0/config 目录下新建一个test-multi-logstash.conf文件,内容如下:

ELK 之 logStash导出多个表和多表关联查询结果 到mysql数据到es



input {
    jdbc {
        type => "id"

        #数据库连接信息
        jdbc_connection_string => "jdbc:mysql://rdsiinfrfy*******936820.mysql.rds.aliyuncs.com:3306/ewj_market?&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"

        #设置时区
        jdbc_default_timezone => "Asia/Shanghai"

        #数据库用户名
        jdbc_user => "tomer"

        #数据库密码
        jdbc_password => "Ywjm****52650"

        #数据库驱动路径
        jdbc_driver_library => "./lib/jars/mysql-connector-java-8.0.15.jar"
        #jdbc_driver_library => "/data/software/es/logstash-libs/connector-java-5.1.29-bin.jar"

        #驱动类名
        jdbc_driver_class => "com.mysql.jdbc.Driver"

        jdbc_paging_enabled => "true"

        jdbc_page_size => "5"

        #定时执行频率 每分钟
        schedule => "* * * * *"

        #使用字段追踪
        #use_column_value => true

        #追踪字段
        #tracking_column => "id"

        #追踪字段类型
        #tracking_column_type => "int"

       

 #是否记录上一次执行到的追踪字段的值
        record_last_run => true

        #上一次执行到的追踪字段的值存放路径 手动创建
        last_run_metadata_path => "./logs/logstash_default_last_time.log"

        #是否清除last_run_metadata_path记录,如果为true,每次都从头开始查询所有数据,增量更新设置为false
        clean_run => false

        #是否将字段强制转为小写
        lowercase_column_names => false

        #sql语句,可用 statement_filepath 参数代替,值为执行的sql文件,注意要查询追踪字段
        statement => "SELECT * from mytest_user"
    }
   

jdbc {
        type => "article_idx"

        #数据库连接信息
        jdbc_connection_string => "jdbc:mysql://rdsiinf36820.mysql.rds.aliyuncs.com:3306/ewj_market?&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"

        #设置时区
        jdbc_default_timezone => "Asia/Shanghai"

        #数据库用户名
        jdbc_user => "toser"

        #数据库密码
        jdbc_password => "Ywj650"

        #数据库驱动路径
        jdbc_driver_library => "./lib/jars/mysql-connector-java-8.0.15.jar"

        #驱动类名
        jdbc_driver_class => "com.mysql.jdbc.Driver"

        jdbc_paging_enabled => "true"

        jdbc_page_size => "5"

        #定时执行频率 每分钟
        schedule => "* * * * *"

        #使用字段追踪
        #use_column_value => true

        #追踪字段
        #tracking_column => "created"

        #追踪字段类型
        #tracking_column_type => "timestamp"

        #是否记录上一次执行到的追踪字段的值
        record_last_run => true

 #上一次执行到的追踪字段的值存放路径 手动创建
        last_run_metadata_path => "./logs/article_idx.log"

        #是否清除last_run_metadata_path记录,如果为true,每次都从头开始查询所有数据,增量更新设置为false
        clean_run => false

        #是否将字段强制转为小写
        lowercase_column_names => false

        #sql语句,可用 statement_filepath 参数代替,值为执行的sql文件,注意要查询追踪字段
        statement => "select  art.id,art.title,art.expert,art.author_id,art.created from  mytest_article art"
    }
}

filter {

}

output {
    if[type] == "id" {
        elasticsearch {
            #ES地址:端口
            hosts => ["121.40.42.216:9200"]

            #自定义索引
            index => "mytest"

            #设置自增主键ID
            document_id => "%{id}"
            user => "elastic"
            password => "elasticCll"
        }
    }
    if[type] == "article_idx" {
        elasticsearch {
            #ES地址:端口
            hosts => ["121.40.42.216:9200"]

            #自定义索引
            index => "mytest_article_idx"

            #设置自增主键ID
            document_id => "%{id}"
            user => "elastic"
            password => "elasticCll"
        }
    }
    stdout {
        #以JSON格式输出
        codec => json_lines
    }
}
 



之后在根目录下,执行:./bin/logstash -f ./config/test-multi-logstash.conf


组合查询:多表查询  到mysql数据到es

首先创建索引,如下:

put user_article_idx
{
  "settings": {
    "analysis": {
      "analyzer": {
        "default": {
          "type": "ik_max_word"
        }
      }
    }
  },
  "mappings": {
    "properties": {
        "id":{
          "type":"long"
        },
          "name":{
          "type": "text"
        },
        "age":{
          "type":"long"
        }, 
        "artId":{
          "type":"long"
        },
        
        "title":{
          "type": "text"
        },
        "expert":{
          "type": "text"
        },
         "author_id":{
          "type":"long"
        },
        "created": {
          "type": "date"
        }
      }
  }
}



2:导出关联表查询如下:

在/data/software/es/logstash-7.7.0/config 目录下新建一个user-article-logstash.conf 文件,内容如下:

               ELK 之 logStash导出多个表和多表关联查询结果 到mysql数据到es          

              



input {
    jdbc {
        type => "user-article"

        #数据库连接信息
        jdbc_connection_string => "jdbc:mysql://rdsiinfrfyqfaff1363832936820.mysql.rds.aliyuncs.com:3306/ewj_market?&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"

        #设置时区
        jdbc_default_timezone => "Asia/Shanghai"

        #数据库用户名
        jdbc_user => "tomcat_user"

        #数据库密码
        jdbc_password => "Ywjmofing88352650"

        #数据库驱动路径
        jdbc_driver_library => "./lib/jars/mysql-connector-java-8.0.15.jar"
        #jdbc_driver_library => "/data/software/es/logstash-libs/connector-java-5.1.29-bin.jar"

        #驱动类名
        jdbc_driver_class => "com.mysql.jdbc.Driver"

        jdbc_paging_enabled => "true" 
         
        jdbc_page_size => "5"
           
        #定时执行频率 每分钟
        schedule => "* * * * *"
             
        #使用字段追踪 
        #use_column_value => true
 
        #追踪字段
        #tracking_column => "id"
    
        #追踪字段类型
        #tracking_column_type => "int"

#是否记录上一次执行到的追踪字段的值
        record_last_run => true

        #上一次执行到的追踪字段的值存放路径 手动创建
        last_run_metadata_path => "./logs/user_article_log.log"

        #是否清除last_run_metadata_path记录,如果为true,每次都从头开始查询所有数据,增量更新设置为false
        clean_run => false

        #是否将字段强制转为小写
        lowercase_column_names => false

        #sql语句,可用 statement_filepath 参数代替,值为执行的sql文件,注意要查询追踪字段
        statement => "select us.id,us.name,us.age,art.id as artId,art.title,art.expert,art.author_id,art.created from mytest_user us,mytest_article art where us.id=art.author_id"
    }
}

filter {

}

output {
    if[type] == "user-article" {
        elasticsearch {
            #ES地址:端口
            hosts => ["121.40.42.216:9200"]

            #自定义索引
            index => "user_article_idx"

            #设置自增主键ID
            document_id => "%{artId}"
            user => "elastic"
            password => "elasticCll"
        }
    }
    stdout {
        #以JSON格式输出
        codec => json_lines
    }
}
 


注意:

          document_id => "%{artId}"

以artId为文档ID才不会重复,不然会出现文档ID重复,插不进去。



之后在根目录下,执行:./bin/logstash -f ./config/user-article-logstash.conf

ELK 之 logStash导出多个表和多表关联查询结果 到mysql数据到es

 

查询结果如下:

get user_article_idx/_search
{
  "query":
  {
    "match_all": {}
  },
  // "_source":["id","title"],
  "sort":
     [{"id":"desc"}]
    
}

ELK 之 logStash导出多个表和多表关联查询结果 到mysql数据到es