Kafka连接自定义timestamp.extractor

问题描述:

我试图从卡夫卡读取消息到S3,有问题添加jar到Kafka连接类路径。Kafka连接自定义timestamp.extractor

目标是根据时间戳在分区中写入消息,该时间戳是卡夫卡消息中密钥的一部分。

为了使故事简短,我必须提供自定义时间戳提取器。在文档here之后创建了一个类,该类实现TimestampExtractor接口并将一个JAR位置添加到plugin.path属性。

问题是,当我开始连接时,找不到类。不知怎的,罐子是不是在classpath中,我越来越

org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor 

其他数据:

版本:铺满4.0.0

连接:连接独立

起动指令:

sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \ /home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \ /home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties

Aprreaciate any help。

为了可以定制时间戳提取类,你的S3接口,您将需要以下内容:

  • 与您的自定义类添加的jar与其他连接器的依赖一起。例如:

    保存./share/java/kafka-connect-s3下,如果你想这是 仅在S3连接器,或在 ./share/java/kafka-connect-storage-common将其提供给 所有存储片连接器(S3目前和HDFS连接器)。

  • 确保您的自定义类实现了io.confluent.connect.storage.partitioner.TimestampExtractor界面。
  • 当您在连接器的配置中设置timestamp.extractor属性时,请使用完全限定的类名,当然还要确保它与您在jar中定义和打包的包相匹配。例如:

    timestamp.extractor=me.connectors.MyTimestampExtractor

最后,你会遵循类似的过程,以使自定义分区提供给您的连接器。

+0

作为一个魅力工作 –

+0

感谢这一点,但是,你可以澄清为什么'plugin.path'不能加载自定义提取器或分区? –

+0

当你说它_doesn't work_,你的意思是它不会从'plugin.path'中的单独目录加载它?这是因为分区器类和timestamp.extractor类都不被视为Connect插件(至少在此时)。连接插件是:连接器,转换器,转换器。 –