Kafka将单个日志事件行聚合到组合日志事件中
问题描述:
我正在使用Kafka处理日志事件。我对Kafka Connect和Kafka Streams有简单连接器和流转换的基本知识。Kafka将单个日志事件行聚合到组合日志事件中
现在我有具有以下结构的日志文件:
timestamp event_id event
日志事件具有由event_id的连接的多个日志行(例如,邮件日志)
实施例:
1234 1 START
1235 1 INFO1
1236 1 INFO2
1237 1 END
而且一般有多个事件:
Examp勒:
1234 1 START
1234 2 START
1235 1 INFO1
1236 1 INFO2
1236 2 INFO3
1237 1 END
1237 2 END
的时间窗口(开始和结束之间)可达到5分钟。
至于结果我要像
event_id combined_log
例的话题:
1 START,INFO1,INFO2,END
2 START,INFO2,END
什么是实现这一目标的正确的工具?我试图用卡夫卡流解决它,但我可以弄清楚如何...
答
在你的用例中,你基本上是基于消息有效载荷来重建会话或事务。目前没有内置的,即时可用的支持这种功能。但是,您可以使用Kafka的Streams API的Processor API部分来自行实现此功能。您可以编写自定义处理器,使用状态存储区来跟踪给定密钥何时开始,添加和结束会话/事务。
邮件列表中的一些用户已经在做IIRC,但我不知道现有的代码示例,我可以指出您。
您需要注意的是正确处理乱序数据。在你上面的例子,你列出了正确的顺序所有输入数据:
1234 1 START
1234 2 START
1235 1 INFO1
1236 1 INFO2
1236 2 INFO3
1237 1 END
1237 2 END
但在实践中,信息/记录可能到达了序,像这样(我只显示与关键1
信息,以简化的例子) :
1234 1 START
1237 1 END
1236 1 INFO2
1235 1 INFO1
即使出现这种情况,据我所知,在您的使用情况下,你仍然要将此数据解释为:START -> INFO1 -> INFO2 -> END
而非START -> END
(忽略/丢弃INFO1
和INFO2
=数据丢失)或START -> END -> INFO2 -> INFO1
(顺序不正确,可能也违反了你的语义限制)。
感谢您的回答,我会看看Processor API。是的,订单问题也应该考虑。 – imehl
处理器API是解决方案 - 再次感谢! – imehl
@imehl:或许您想更新您的问题,并提供一些信息,说明您最终做了什么来解决您的问题,现在您找到了解决方案? –