Kafka Connect SourceTask的轮询时间间隔

Kafka Connect SourceTask的轮询时间间隔

问题描述:

我正在使用Kafka-Connect API实现自定义源连接器,可用于轮询REST-API并将JSON响应吸收到Kafka主题中。现在我想知道如何实现SourceTask的轮询间隔,JDBC Connector如何提供。某处我必须设置线程进入睡眠状态,但是我必须在哪里执行此操作?Kafka Connect SourceTask的轮询时间间隔

我在SourceTask实现加入long类型的私有字段来存储时间戳解决了这个用例。在调用的第一个poll()字段尚未初始化,因此配置的REST-API会被轮询。虽然这第一次调用所提到的long字段获得与当前时间戳的初始化。在以下所有调用poll()调用中,先前调用get的这个时间戳被检查。如果自上一次poll()以来经过的毫秒数小于两次轮询之间配置的时间间隔,则会在配置的毫秒过后自动发送线程进入睡眠模式。

使用max.poll.interval.ms。

请参考以下链接:

https://kafka.apache.org/documentation/

+0

这不完全是我要找的。 _max.poll.interval.ms_定义_poll_ invocations之间的最大差距,在消费者离开该组之前,例如,如果线程被阻塞。它像超时一样工作。但是我正在寻找一种解决方案来控制_poll_方法被调用的频率。默认情况下,它不断,我正在寻找一种解决方案,从连接线程调用_poll_例如每五分钟一班。 – Mabi