使用Kafka Connect来导入/导出数据

从控制台读出数据并将其写回是十分方便操作的,但你可能需要使用其他来源的数据或将数据从Kafka导出到其他系统。针对这些系统, 你可以使用Kafka Connect来导入或导出数据,而不是写自定义的集成代码。

Kafka Connect是Kafka的一个工具,它可以将数据导入和导出到Kafka。它是一种可扩展工具,通过运行connectors(连接器), 使用自定义逻辑来实现与外部系统的交互。 在本文中,我们将看到如何使用简单的connectors来运行Kafka Connect,这些connectors 将文件中的数据导入到Kafka topic中,并从中导出数据到一个文件。

第一: 我们将创建一些种子数据来进行测试:

$ echo -e 'hadoop\nspark\nkafka'>test.txt

使用Kafka Connect来导入/导出数据

第二:配置connect-standalone.properties

使用Kafka Connect来导入/导出数据

第二步:配置connect-file-source.properties

使用Kafka Connect来导入/导出数据

注意:

这个地方遇到的坑,

① 如果topic=liucf不是个干净的topic也就是数有历史数据切格式不是json则在下一步消费到文件会报异常

 JsonParseException: Unrecognized token 'xxx': was expecting ('true', 'false' or 'null') 

② 如果启动过一次 bin/connect-standalone.sh config/connect-standalone.properties 然后我们这里又更改了

connect-file-source.properties里的topic,配置成了新的topic名称则需要删除connect.offsets 第一步中的配置的目录里可以找到

第三步:connect-file-sink.properties

使用Kafka Connect来导入/导出数据

第三步:启动

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

使用Kafka Connect来导入/导出数据

第四步:验证文件test.txt的内容是否生产到topic:liucf

使用Kafka Connect来导入/导出数据

已经在topic:liucf里了

使用Kafka Connect来导入/导出数据

第五步:验证是否消费到test.sink.txt文件

使用Kafka Connect来导入/导出数据

完成