检查系统是否连接到Kafka
问题描述:
我需要编写一个Java测试验证系统是否连接到kafka,检查系统是否连接到Kafka
有没有人有任何想法? 我找到了这个帖子:
How to check whether Kafka Server is running?
但它太复杂了从Java代码做的,我不认为这是我应该使用的方向。
在此先感谢。
答
您可以检查服务器是否通过使用此运行:
ZkClient zkClient = new ZkClient("your_zookeeper_server", 5000 /* ZOOKEEPER_SESSION_TIMEOUT */, 5000 /* ZOOKEEPER_CONNECTION_TIMEOUT */, ZKStringSerializer$.MODULE$);
List<Broker> brokers = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
if (brokers.isEmpty()) {
// No brokers available
} else {
// There are brokers available
}
答
我有同样的问题,我不想离开这个问题没有任何答案。 我读了很多关于如何检查连接的信息,并且我发现的大多数答案都是检查与Zk的连接,但我确实想直接使用Kafka服务器检查连接。
我所做的是创建一个简单的KafkaConsumer并列出所有的话题与listTopics()。如果连接成功,那么你会得到一些回报。否则,你会得到一个TimeoutException
。
def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
val props = new Properties()
props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
props.put("group.id", kafkaParams.get("group.id").get.toString)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val simpleConsumer = new KafkaConsumer[String, String](props)
simpleConsumer.listTopics()
}
然后你可以将这个方法包装在try-catch
句子中来捕捉异常。
这是一种检查Zookeeper连接的方法,而不是Kafka服务器。 – dbustosp