卡夫卡流字数统计应用程序
问题描述:
我玩的卡夫卡流API(Kakfa版本:0.10.2.0)试图做一个简单的wordcount示例工作:Wordcount App gist。我同时运行生产者和消费者的控制台:卡夫卡流字数统计应用程序
./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092
./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning
启动应用程序,一切似乎是工作的罚款,但是当我在控制台内生产一些字符串类型,消费者接受什么都没有。如果我改变了应用程序做对消费者接收流输入一个简单的toUppercase(修改为大写)罚款:
//The following code works fine: val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase()) uppercasedWithMapValues.to("output-topic")
有谁知道为什么我的字计数例如接收什么?我应该在消费者上指定任何序列化程序吗?在我的最后一次测试控制台消费者处理,我通过控制台发送,但并没有表现出他们的消息,请参阅下面的输出:
➜ bin ./kafka-console-consumer.sh \
--topic output-topic \
--bootstrap-server localhost:9092 \
--from-beginning
[2017-08-02 07:48:20,187]WARN Error while fetching metadata with correlation id 2 :
{output-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-08-02 07:48:20,197] WARN The following subscribed topics are not assigned
to any members in the group console-consumer-91651 : [output-topic]
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
^CProcessed共有7级的消息
的
答
KStream
的作品,因为它不使用缓存。对于KTable
你必须等一下,或者将cache.max.bytes.buffering
设置为0
(但不是在生产代码!)
太棒了!这就是诀窍!非常感谢你!我想我需要阅读更多关于kafka流内部的内容。再次感谢你@Arek – ardlema
我很高兴帮助你@ardlema :) – Arek