Kafka将单个日志事件行聚合到组合日志事件中

问题描述:

我正在使用Kafka处理日志事件。我对Kafka Connect和Kafka Streams有简单连接器和流转换的基本知识。Kafka将单个日志事件行聚合到组合日志事件中

现在我有具有以下结构的日志文件:

timestamp event_id event 

日志事件具有由event_id的连接的多个日志行(例如,邮件日志)

实施例:

1234 1 START 
1235 1 INFO1 
1236 1 INFO2 
1237 1 END 

而且一般有多个事件:

Examp勒:

1234 1 START 
1234 2 START 
1235 1 INFO1 
1236 1 INFO2 
1236 2 INFO3 
1237 1 END 
1237 2 END 

的时间窗口(开始和结束之间)可达到5分钟。

至于结果我要像

event_id combined_log 

例的话题:

1 START,INFO1,INFO2,END 
2 START,INFO2,END 

什么是实现这一目标的正确的工具?我试图用卡夫卡流解决它,但我可以弄清楚如何...

在你的用例中,你基本上是基于消息有效载荷来重建会话或事务。目前没有内置的,即时可用的支持这种功能。但是,您可以使用Kafka的Streams API的Processor API部分来自行实现此功能。您可以编写自定义处理器,使用状态存储区来跟踪给定密钥何时开始,添加和结束会话/事务。

邮件列表中的一些用户已经在做IIRC,但我不知道现有的代码示例,我可以指出您。

您需要注意的是正确处理乱序数据。在你上面的例子,你列出了正确的顺序所有输入数据:

1234 1 START 
1234 2 START 
1235 1 INFO1 
1236 1 INFO2 
1236 2 INFO3 
1237 1 END 
1237 2 END 

但在实践中,信息/记录可能到达了序,像这样(我只显示与关键1信息,以简化的例子) :

1234 1 START 
1237 1 END 
1236 1 INFO2 
1235 1 INFO1 

即使出现这种情况,据我所知,在您的使用情况下,你仍然要将此数据解释为:START -> INFO1 -> INFO2 -> END而非START -> END(忽略/丢弃INFO1INFO2 =数据丢失​​)或START -> END -> INFO2 -> INFO1(顺序不正确,可能也违反了你的语义限制)。

+0

感谢您的回答,我会看看Processor API。是的,订单问题也应该考虑。 – imehl

+1

处理器API是解决方案 - 再次感谢! – imehl

+0

@imehl:或许您想更新您的问题,并提供一些信息,说明您最终做了什么来解决您的问题,现在您找到了解决方案? –