Zookeeper和Kafka错误KeeperErrorCode = NodeExists

问题描述:

我已经写了kafkaconsumerproducer这工作很好,直到今天。 今天早上,当我开始zooekeeperkafka,我的消费者没有成功读取消息和Zookeeper log我读到这个错误Zookeeper和Kafka错误KeeperErrorCode = NodeExists

INFO Got user-level KeeperException when processing sessionid:0x151c41e62e10000 type:create cxid:0x2a zxid:0x1e txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor) 

你能帮助我吗?短短几天内可能会发生什么变化?我不明白。 非常感谢。

+0

看起来你试图用相同的经纪人ID启动另一个kafka经纪人 –

+0

谢谢。这很奇怪。我遵循其他时间的相同程序。什么东西可以'我试图解决这个问题? – adellarocca

+0

尝试重新启动动物园管理员后跟kafka一次 –

我在运行Windows 7的Kafka 2.11中出现了这个错误。我认为这个异常不是问题,因为它只是信息级别。只要确保经纪人仍在运行。即使有这个错误,我仍然可以:

  1. 创建并列出话题kafka-topics.bat
  2. 消费主题kafka-console-consumer.bat
  3. 以编程方式发送消息producer.send(new ProducerRecord<String, String>("topic", "hello"))
+0

我确认这是因为我在我的Windows环境中遇到同样的问题,但它似乎不影响功能 –

在我的情况下,它似乎影响功能,因为我不能消费消息。请参阅下面的代码

Vertx instance = VertxConfig.getInstance();

Properties consumerConfig = new Properties(); 
    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 

// consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “最早的”); // consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,“false”);

Properties producerConfig = new Properties(); 
    producerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    producerConfig.put("acks", "1"); 

    String topic = "dstv-queue-3"; 
    consumer = KafkaConsumer.create(instance, consumerConfig); 
    producer = KafkaProducer.create(instance, producerConfig, String.class, String.class); 
    consumer.subscribe(topic); 

    instance.setPeriodic(2000, worker -> { 
     KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(topic, "message"); 
     producer.write(record, writeHandler -> { 
      RecordMetadata metadata = writeHandler.result(); 

      //if meta data returned.. 
      if (metadata != null) { 
       long offset = metadata.getOffset(); 
       int partition = metadata.getPartition(); 
       System.out.println("completed write: " + (writeHandler.succeeded() ? "successful" : "failed") + " offset:" + offset + " partition: " + partition); 
      } 
     }); 
    }); 

    AtomicLong counter = new AtomicLong(); 
    consumer.handler(readHandler -> System.out.println(counter.getAndAdd(1) + ". " + readHandler.value() + " was received")); 
+0

我不明白 –