使用jmx显示器kafka主题
问题描述:
我正在使用jmx来监视kafka主题。使用jmx显示器kafka主题
val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://broker1:9393/jmxrmi");
val jmxc = JMXConnectorFactory.connect(url, null);
val mbsc = jmxc.getMBeanServerConnection();
val messageCountObj = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=mytopic");
val messagesInPerSec = mbsc.getAttribute(messageCountObj,"MeanRate")
使用此代码我可以在broker1上获得“mytopic”的MeanRate。 但我有10个经纪商,我怎么能从所有经纪商那里获得“mytopic”的MeanRate?
我有尝试 “服务:JMX:RMI:/// JNDI/RMI:// BROKER1:9393,broker2:9393,broker3上:9393/jmxrmi”
得到一个错误:(
答
如果是这么简单,这将是很好的;)
如你所述,没有办法做到这一点。您需要为每个经纪人分别建立连接。
一个可行的办法是使用MBeanServer Federation这将在一个MBeanServer中注册为每个经纪人的代理,因此,如果你这样做对BROKER1,你可以连接到service:jmx:rmi:///jndi/rmi://broker1:9393/jmxrmi
和查询统计为您的所有经纪人在一个去,但你需要查询10个不同的ObjectName,查询每个ObjectNames的值,然后自己计算MeanRate。 [Java]伪代码:
ObjectName wildcard = new ObjectName("*:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=mytopic");
double totalRate = 0d;
int respondingBrokers = 0;
for(ObjectName on : mbsc.queryNames(wildcard, null)) {
totalRate += (Double)mbsc.getAttribute(messageCountObj,"MeanRate");
respondingBrokers++;
}
// Average rate of mean rates: totalRate/respondingBrokers
注意:没有异常处理,并且我假定rate类型是Double。
您也可以创建并注册一个计算联合代理上的聚合平均值的自定义MBean。
如果你是面向maven的,你可以从here构建OpenDMK。