创建`KafkaServer`从Java
问题描述:
我试图启动一个卡夫卡服务器形式的Java创建`KafkaServer`从Java
具体来说,我怎么能翻译斯卡拉的this line入行渣华的?
private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)
我可以轻松地创建serverConfig中,但我似乎无法能够创建kafkaMetricsReporters
参数。
注:我可以创建一个KafkaServerStartable
,但我想创建一个正常的KafkaServer
以避免JVM在发生错误时退出。
Apache的版本卡夫卡0.11.0.1
答
的kafkaMetricsReporters
参数是斯卡拉Seq
。
您可以:
-
创建一个Java集合并将其转换成一个序列:
您需要导入
scala.collection.JavaConverters
:List<KafkaMetricsReporter> reportersList = new ArrayList<>(); ... Seq<KafkaMetricsReporter> reportersSeq = JavaConverters.asScalaBufferConverter(reportersList).asScala();
-
使用
KafkaMetricsReporter.startReporters()
方法来创建它们对于您的配置:作为
KafkaMetricsReporter
是单身,你需要使用MODULE
符号来使用它:Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props));
另外,KafkaServer
构造有从Java调用它时需要2个其他参数:
-
time
可以很容易地使用new org.apache.kafka.common.utils.SystemTime()
-
threadNamePrefix
是一个选项。如果导入scala.Option
,你就可以调用Option.apply("prefix")
全部放在一起:
Properties props = new Properties();
props.put(...);
KafkaConfig config = KafkaConfig.fromProps(props);
Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props));
KafkaServer server = new KafkaServer(config, new SystemTime(), Option.apply("prefix"), reporters);
server.startup();
谢谢:) 两个问题:你的意思是'reportersList',而不是'reporters'? 'new SystemTime()'来自哪里(哪个包)? – nha
另外,这意味着不会有记者,对吗? (目前我很好,虽然我不得不在某些时候创建一些) – nha
我已经更新了我的答案。 'startReporters()'可能是你想用的 –