将JSON文件导入Logstash + Elasticsearch + Kibana

问题描述:

因此,我有一个Web平台,每个包含有关该请求的日志数据的请求都会打印一个JSON文件。我可以配置几个关于什么时候应该记录东西的规则,只有在某些级别,等等......将JSON文件导入Logstash + Elasticsearch + Kibana

现在,我一直在玩Logstash + Elasticsearch + Kibana3堆栈,我很想找到一种方法在Kibana中查看这些日志。我的问题是,有没有办法让Logstash导入这些类型的文件,还是我必须为它编写一个自定义输入插件?我搜索了一下,看到了我所看到的,插件是用Ruby编写的,这是一种我没有经验的语言。

Logstash只是一个将各种syslog文件转换为JSON并将它们加载到elasticsearch(或graphite,或...)中的工具。

由于您的文件已经在JSON中,因此不需要logstash。您可以使用curl将它们直接上传到elasticsearch。

Import/Index a JSON file into Elasticsearch

然而,为了与Kibana很好地工作,你的JSON文件需要在最低限度。

  1. Flat-Kibana没有挖掘嵌套的JSON结构。你需要一个简单的键/值对散列。

  2. 有一个可识别的时间戳。

我建议看看JSON文件logstash输出,看看你是否可以按摩你的JSON文件来匹配那个结构。你可以用你支持JSON的 这样的语言来做到这一点。程序jq对于将json从一种格式过滤到另一种格式非常方便。

Logstash格式 - https://gist.github.com/jordansissel/2996677

JQ - http://stedolan.github.io/jq/

+0

我们说logstash无法处理大量带有单个JSON条目的文件吗?否则,现在您必须确定是否存在超过1行,使用curl,否则导入到Logstash中。当然,单行文件与Logstash的观点没有什么不同? – 2014-10-09 18:49:36

+0

我对logstash和json文件没有提及。我只是说,如果logstash无法做到你想做的事情,并且如果数据已经在JSON中,那么做logstash的做法是相当简单的。 – 2014-10-10 23:53:55

Logstash可以导入不同的格式和来源,因为它提供了很多插件。还有其他的日志收集器和转发工具,可以发送日志到logstash,如nxlog,rsyslog,syslog-ng,flume,kafka,fluentd等等。据我所知,大多数人在windows上使用nxlog(尽管它适用于linux),因为它的资源占用少。 (免责声明:我隶属于该项目)

Logstash是处理动态文件一个很好的工具。

下面是一个使用logstash导入您的JSON文件到elasticsearch方式:

配置文件:

{"foo":"bar", "bar": "foo"} 
{"hello":"world", "goodnight": "moon"} 

注意JSON必须是:JSON文件的

input 
{ 
    file 
    { 
     path => ["/path/to/json/file"] 
     start_position => "beginning" 
     sincedb_path => "/dev/null" 
     exclude => "*.gz" 
    } 
} 

filter 
{ 
    mutate 
    { 
     replace => [ "message", "%{message}" ] 
     gsub => [ 'message','\n',''] 
    } 
    if [message] =~ /^{.*}$/ 
    { 
     json { source => message } 
    } 

} 

output 
{ 
    elasticsearch { 
    protocol => "http" 
    codec => json 
    host => "localhost" 
    index => "json" 
    embedded => true 
    } 

    stdout { codec => rubydebug } 
} 

例子在一行中。如果你想解析多行json文件,请替换配置文件中的相关字段:

input 
{ 
    file 
    { 
     codec => multiline 
     { 
      pattern => '^\{' 
      negate => true 
      what => previous     
     } 
     path => ["/opt/mount/ELK/json/*.json"] 
     start_position => "beginning" 
     sincedb_path => "/dev/null" 
     exclude => "*.gz" 
    } 
} 

filter 
{ 
    mutate 
    { 
     replace => [ "message", "%{message}}" ] 
     gsub => [ 'message','\n',''] 
    } 
    if [message] =~ /^{.*}$/ 
    { 
     json { source => message } 
    } 

} 
+0

第一种配置不起作用。 :response => {“create”=> {“_ index”=>“fb”,“_type”=>“logs”,“_id”=>“AVZUyqwOVPEDPgwGc4_k”,“status”=> 400,“error”=> {“type”=>“mapper_parsing_exception”,“reason”=>“解析失败”,“causes_by”=> {“type”=>“illegal_state_exception”,“reason”=>“混合字段类型:class org。 elasticsearch.index.mapper.core.StringFieldMapper $ StringFieldType!= class org.elasticsearch.index.mapper.internal.IdFieldMapper $ IdFieldType on field _id“}}}},:level =>:warn} – KevinOelen 2016-08-04 09:04:04