如何使用Java中的Kafka 8.2 API生成消息?

问题描述:

我正在尝试在java中使用kafka API。我正在使用以下maven依赖项:如何使用Java中的Kafka 8.2 API生成消息?

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.0</version> 
</dependency> 

我无法连接到远程kafka服务器。 我将kafka'server.properties'文件端口属性更改为端口8080. 我可以启动zookeeper和kafka服务器都没有问题。 我也可以使用随kafka下载的控制台生产者和消费者应用程序。 (斯卡拉2.10版本)

我使用下面的客户端代码来创建远程KafkaProducer

Properties propsProducer = new Properties(); 

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); 
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 
propsProducer.put("topic.metadata.refresh.interval.ms", "0"); 

KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer); 

一旦我创建了制片人,我可以运行下面一行,并得到有效的主题信息返回,授予strTopic是现有的主题名称。

List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic); 

当我尝试发送一个消息,我请执行下列操作:

ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes()); 

RecordMetadata futureData = m_kafkaProducer.send(prMessage).get(); 

调用send()块无限期,当我手动终止过程中,我看到错误关闭套接字由于kafka服务器上的错误(IOException,连接重置由对等)错误。

此外,host.name,advertised.host.name和advertised.port属性在'server.properties'文件中仍然被注释掉,这是值得的。哦,如果我改变该行:

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); 

propsProducer.put("bootstrap.servers", "127.0.0.1:8080"); 

并且安装了卡夫卡服务器在同一台服务器上运行它,它的作品,但我试图与它合作远程。

感谢任何帮助,如果我能澄清一切让我知道。

+1

你是否字面上使用'172.xx.xx.xxx'作为主机IP地址? – 2015-03-30 18:48:52

+0

不,这是一个完整的IP,X的只是口罩。 – 2015-03-30 18:53:00

+0

Kk。也许防火墙问题?您可以使用netcat验证端口8080上的网络连接吗? – 2015-03-30 19:56:50

经过大量的挖掘,我决定实施这里找到的例子:Kafka Producer Example。我缩短了代码,没有实现分区类。我更新了我的pom列出的依赖关系,并且我仍然遇到同样的问题。最终,我做了一些配置更改,一切正常。

最后一块难题是在服务器和客户端机器的/ etc/hosts中定义Kafka服务器。我将以下内容添加到两个文件中。

172.xx.xx.xxx  serverHost1 

再次,x的只是口罩。然后,我将server.properties文件中的advertised.host.name设置为serverHost1。注意:在服务器计算机上运行ifconfig后,我获得了该IP。

我改了行

propsProducer.put("metadata.broker.list", "172.xx.xx.xxx:8080"); 

propsProducer.put("metadata.broker.list", "serverHost1:8080"); 

卡夫卡API不喜欢,我是定义IP作为一个字符串的事实。相反,它从etc/hosts文件中查找IP,虽然文档中提到:

“代理将向生产者和消费者通告的主机名。如果未设置,则使用”host.name“的值(如果已配置)否则,它将使用从java.net.InetAddress.getCanonicalHostName()返回的值。“

这将只返回IP,在字符串形式,我以前使用,如果没有定义在客户端机器的etc/hosts中,否则它返回与IP(serverHost1在我的情况下)配对的名称。另外,我从来没有设置host.name的值。

+0

是bootstrap.servers替换metadata.broker.list? – 2016-08-22 05:55:05

+1

是的,我相信。在0.8.2.0版本中,该字段为“metadata.broker.list”,但在较新的版本中为“boostrap.servers” – 2016-08-22 13:53:33

+0

是的!这是真的。与新的ProducerAPI这是新的配置。 – 2016-08-23 04:24:02