实际值无法识别从Kinesis Firehose发送JSON数据到elasticsearch
问题描述:
我在Kibana中遇到问题,在以下行中解释了字段value
。我会尽力解释这种情况。实际值无法识别从Kinesis Firehose发送JSON数据到elasticsearch
我将dynamoDB流发送到Lambda,然后发送到Kenesis Firehouse,最后从Firehose发送到Elasticsearch。我使用Kibana来可视化数据,这里是我遇到问题的地方。
比方说,我要送这个JSON来DynamoDB:
拉姆达我收到以下:
{
"data": {
"M": {
"machine": {
"M": {
"application": {
"S": "application"
},
"brand": {
"S": "band"
}
}
},
"description": {
"S": "This is the description"
},
"id": {
"S": "identificator"
},
"units": {
"S": "units"
},
"value": {
"N": "33"
},
"_msgid": {
"S": "85209b75.f51ee8"
},
"timestamp": {
"S": "2017-05-09T06:38:00.337Z"
}
}
},
"id": {
"S": "85209b75.f51ee8"
}
}
如果我转发这最后JSON来室壁运动流水,在Kibana时我配置索引模式,它自动识别"timestamp"
(这很好)。这里的问题是字段"value"
就像一个字符串,它不被识别。
我试图修改JSON,然后再发送到流水,但随后Kibana不承认"timestamp"
:
{
"data": {
"machine": {
"application": "application",
"brand": "brand"
},
"description": "This is the description",
"id": "identificator",
"units": "KWh",
"value": 33,
"_msgid": "85209b75.f51ee8",
"timestamp": "2017-05-09T06:38:00.337Z"
},
"id": "85209b75.f51ee8"
}
我想知道我怎么能发送这个数据和Kibana承认“时间戳“和”值“字段。
这是我使用的拉姆达的代码示例:
var AWS = require('aws-sdk');
var unmarshalJson = require('dynamodb-marshaler').unmarshalJson;
var firehose = new AWS.Firehose();
exports.lambda_handler = function(event, context) {
var record = JSON.stringify(event.Records[0].dynamodb.NewImage);
console.log("[INFO]:"+JSON.stringify(event.Records[0].dynamodb.NewImage));
var params = {
DeliveryStreamName: 'DeliveryStreamName',
Record:{
Data: record
}
};
firehose.putRecord(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else console.log(JSON.stringify(data)); // successful response
context.done();
});
};
答
我解决它通过创建自己的索引映射,而不是让室壁运动流水创建它。并声明"timestamp"
属性作为{ "type" : "date" }
和"value"
attibute为{ "type" : "float" }
例如,对于这种类型的JSON的:
{
"data": {
"timestamp": "2017-05-09T11:30:41.484Z",
"tag": "tag",
"value": 33,
"units": "units",
"type": "type",
"machine":{
"name": "name",
"type": "type",
"company": "company"
}
},
"id": "85209b75.f51ee8"
}
我手动创建以下elasticsearch指数和映射:
PUT /index
{
"settings" : {
"number_of_shards" : 2
},
"mappings" : {
"type" : {
"properties" : {
"data" : {
"properties" : {
"machine":{
"properties": {
"name": { "type" : "text" },
"type": { "type" : "text" },
"company": { "type" : "text" }
}
},
"timestamp": { "type" : "date" },
"tag" : { "type" : "text" },
"value": { "type" : "float" },
"description": { "type" : "text" },
"units": { "type" : "text" },
"type" : { "type" : "text" },
"_msgid": { "type" : "text" }
}
},
"id": { "type" : "text" }
}
}
}
}
所以,为了解决这个问题,我认为在lambda中你需要检查索引映射是否存在,如果不是你自己创建的更好的解决方案。