Kafka连接自定义timestamp.extractor
我试图从卡夫卡读取消息到S3,有问题添加jar到Kafka连接类路径。Kafka连接自定义timestamp.extractor
目标是根据时间戳在分区中写入消息,该时间戳是卡夫卡消息中密钥的一部分。
为了使故事简短,我必须提供自定义时间戳提取器。在文档here之后创建了一个类,该类实现TimestampExtractor
接口并将一个JAR位置添加到plugin.path
属性。
问题是,当我开始连接时,找不到类。不知怎的,罐子是不是在classpath中,我越来越
org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor
其他数据:
版本:铺满4.0.0
连接:连接独立
起动指令:
sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \ /home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \ /home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties
Aprreaciate any help。
为了可以定制时间戳提取类,你的S3接口,您将需要以下内容:
-
与您的自定义类添加的jar与其他连接器的依赖一起。例如:
保存
./share/java/kafka-connect-s3
下,如果你想这是 仅在S3连接器,或在./share/java/kafka-connect-storage-common
将其提供给 所有存储片连接器(S3目前和HDFS连接器)。 - 确保您的自定义类实现了
io.confluent.connect.storage.partitioner.TimestampExtractor
界面。 -
当您在连接器的配置中设置
timestamp.extractor
属性时,请使用完全限定的类名,当然还要确保它与您在jar中定义和打包的包相匹配。例如:timestamp.extractor=me.connectors.MyTimestampExtractor
最后,你会遵循类似的过程,以使自定义分区提供给您的连接器。
作为一个魅力工作 –
感谢这一点,但是,你可以澄清为什么'plugin.path'不能加载自定义提取器或分区? –
当你说它_doesn't work_,你的意思是它不会从'plugin.path'中的单独目录加载它?这是因为分区器类和timestamp.extractor类都不被视为Connect插件(至少在此时)。连接插件是:连接器,转换器,转换器。 –