Flink - 查询卡夫卡主题用于消费群体的抵消?

问题描述:

问:我怎样才能查询码弗林克内部的特定消费群的抵消了卡夫卡的话题? (和侧面的问题(如果需要的话,会在这里提出一个新问题)如果可能的话,我可以得到该偏移量的时间戳吗?Flink - 查询卡夫卡主题用于消费群体的抵消?

(我发现有cli工具来查询它,但是这不是我想要的,因为它不是编程方式弗林克工作中完成的)

上满问题的一些额外的背景,但我不想让这个过于开放式的。

我有其中数据将被从kafkaTopic1流入的程序的使用情况下(让我们称之为P1),处理,然后保存到数据库中。P1将是一个多节点集群上,以便每个节点将处理许多卡夫卡分区(允许说有该主题的5个节点和50个卡夫卡分区)。如果其中一个节点由于某种原因完全失败并且正在处理数据,那么该数据将会丢失。

例如,如果kafkaTopic1上有500条消息并且node2已经拉动了10条消息(因此根据偏移量拉取的下一条消息是消息11),但只有8条消息已经完全处理并保留到数据库节点失败,仍然正在处理的2将会丢失。而当节点恢复起来将开始从消息11读取,跳过两个丢失的消息(上和技术上卡夫卡分区将开始发送其消息到另一个节点进行处理,以便在该分区的偏移会移动,我们不当节点死亡时,必然确切地知道下一个要处理的消息)。

(注:当节点死亡,假设用户通知和断开P1完全所以没有更多的数据将在这个点进行处理,暂时)。

因此,这是弗林克用武之地。我想做一个flink作业,可以通过用户的参数告诉P1的使用者组,然后查询kafka主题(也由用户提供)以获取当前偏移量(OS1)。然后,flink作业将设置其偏移量为kafkaTopic1为OS1之前的X个时间量(X由用户通过参数提供)并开始读取来自kafka主题的消息。然后,它会将它读取的每条消息与数据库中的内容进行比较,如果它未在数据库中找到该消息,则会将其发送到另一个kafka主题(kafkaTopic2),以在重新启动时由P1处理。

如果检查点是在弗林克作业启用,那么你不应该失去消息,因为弗林克保持偏移内部以及从故障恢复后,就应该从偏移弗林克最后提交的读取。

现在,如果您仍然希望找到偏移量并重新从偏移量中读取数据,这会变得棘手,因为您需要为给定使用者组找到给定主题的所有分区的偏移量。

我不知道如何从Flink-kafka-Consumer API开箱即可完成此任务,但是您可以将kafka依赖项添加到您的项目中,并从Kafka API创建一个kafkaconsumer。一旦你的消费者,你可以拨打

consumer.position(partition) 

consumer.committed(partition) 

记住,你仍然需要遍历所有分区让所有的电流失调

阅读对这里的区别:Kafka Javadoc

一旦你有你想从中读取数据的偏移量,你可以使用类似下面的手动在弗林克作业指定消费者偏移:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); 
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); 
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L); 
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L); 

myConsumer.setStartFromSpecificOffsets(specificStartOffsets); 

为弗林克 - 卡夫卡消费更多信息,检查了这一点Flink Kafka Connector

+0

谢谢为什么我需要做的是因为那将是失败的程序(P1)是不是一个弗林克,我们需要的弗林克程序我的工作基本上告诉P1的原因:“嘿,你的偏移量是一个50级的时候你只将消息1到25的结果保存到数据库中。重新处理的消息26到49" 。谢谢你,我会向非弗林克卡夫卡消费者去一个让你知道事情是如何工作的。 – Jicaar

+0

如果多数民众赞成的情况下,也许你应该手动提交的偏移量,只有当数据持续到数据库并且不使用enable.auto.commit。所以基本上,禁用“enable.auto.commit”标志和手动启动承诺。这样,当坚持到外部数据库的过程中出现故障,它不会被提交到卡夫卡。 –

+0

我建议为好,但它听起来就像是太复杂了(是短型)。其中,数据可以保存到数据库听起来像的最大障碍多个可能的“出口点”。如果记录1来通过和花费的时间比纪录2进行处理,其他值得关注的是记录2的承诺偏移会被重写记录1的偏移这将表明创纪录的2还没有被处理。如果是有道理的 – Jicaar