Storm-Kafka在zookeeper集群中未创建节点。
问题描述:
我是usign storm 0.10和kafka 0.9.0.0与storm-kafka。每当我在群集中运行我的拓扑它开始从开始读书,虽然我给zkRoot和消费者的groupId从属性文件 -Storm-Kafka在zookeeper集群中未创建节点。
kafka.zkHosts=myserver.myhost.com:2181
kafka.topic=onboarding-mail-topic
kafka.zkRoot=/kafka-storm
kafka.group.id=onboarding
脱粒机:
BrokerHosts zkHosts = new ZkHosts(prop.getProperty("kafka.zkHosts"));
String topicName = prop.getProperty("kafka.topic");
String zkRoot = prop.getProperty("kafka.zkRoot");
String groupId = prop.getProperty("kafka.group.id");
//kafka spout conf
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, groupId);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
当我检查饲养员ls /
它不” t告诉我kafka-storm
[controller_epoch, controller, brokers, storm, zookeeper, kafka-manager, admin, isr_change_notification, consumers, config]
答
最后,我想通了。由于从卡夫卡中读取并将偏移量写回卡夫卡的方式受到了不同的控制。
如果在暴风雨群集上运行拓扑不论单个或多个节点的请确保您已设置以下在storm.yaml文件
storm.zookeeper.servers
和
storm.zookeeper.port
性能除了zkHosts和zkRoot和消费者组ID。
或最佳的做法是通过在像创建KafkaSpout设置正确的值来覆盖拓扑中的这些属性 -
BrokerHosts zkHosts = new ZkHosts(prop.getProperty("kafka.zkHosts"));
String topicName = prop.getProperty("kafka.topic");
String zkRoot = prop.getProperty("kafka.zkRoot");
String groupId = prop.getProperty("kafka.group.id");
String kafkaServers = prop.getProperty("kafka.zkServers");
String zkPort = prop.getProperty("kafka.zkPort");
//kafka spout conf
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, groupId);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.zkServers = Arrays.asList(kafkaServers);
kafkaConfig.zkPort = Integer.valueOf(zkPort);
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
甚至你可以把配置对象,这些价值。这是更好,因为你可能想弥补信息存储到其他一些动物园管理员集群VS拓扑从一个完全不同的经纪人读取消息understanding-
@Override
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;
Map stateConf = new HashMap(conf);
List<String> zkServers = _spoutConfig.zkServers;
if (zkServers == null) {
zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if (zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = new ZkState(stateConf);
_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
} else {
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
}
KafkaSpout代码段