【Kafka监控之Kafka Web Console】
Kafka Web Console是kafka的开源web监控程序.
功能介绍如下:
brokers列表
连接kafka的zk集群列表
所有topic列表,操作相应topic可以浏览查看相应message生产和消费流量图.
[[email protected] opt]# ls
collectd es5.0 hadoop_data mq path storm096 zookeepe346
elasticsearch-2.0.0-rc1 flume1.6 httpd-2.2.23 nagios php-5.4.10 stormTest.jar
elasticsearch-2.1.1 gnu influxdb nagios-plugins-1.4.13 Python-2.6.6 wget-log
elasticsearch-jdbc-2.2.0.0 grafana-2.5.0 kafka_2.10-0.9.0.1 openssl-1.0.0e Python-2.6.6.tgz
elasticsearch-jdbc-2.2.0.0.zip hadoop kafka-web-console-2.1.0-SNAPSHOT.zip ORCLfmap soft yum-3.2.26.tar.gz
[[email protected] opt]# cat zookeepe346/conf/zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zookeepe346/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
[[email protected] opt]# sh zookeepe346/bin/zkServer.sh start
JMX enabled by default
Using config: /opt/zookeepe346/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[[email protected] opt]# jps
4104 QuorumPeerMain
4121 Jps
[[email protected] opt]#
[[email protected] opt]# unzip kafka-web-console-2.1.0-SNAPSHOT.zip
Archive: kafka-web-console-2.1.0-SNAPSHOT.zip
inflating: kafka-web-console-2.1.0-SNAPSHOT/lib/default.kafka-web-console-2.1.0-SNAPSHOT.jar
inflating: kafka-web-console-2.1.0-SNAPSHOT/lib/finagle-kafka_2.10-0.1.2-SNAPSHOT.jar
[[email protected] opt]# cd kafka-web-console-2.1.0-SNAPSHOT
[[email protected] kafka-web-console-2.1.0-SNAPSHOT]# ls
bin conf lib README.md share
[[email protected] kafka-web-console-2.1.0-SNAPSHOT]# cd bin/
[[email protected] bin]# ls
kafka-web-console kafka-web-console.bat
[[email protected] bin]# cat ../conf/application.conf
# This is the main configuration file for the application.
# ~~~~~
http.port=9001
[[email protected] bin]# sh kafka-web-console
Play server process ID is 4154
[info] play - database [default] connected at jdbc:h2:file:play
[warn] play - Your production database [default] needs evolutions!
INSERT INTO settings (key_, value) VALUES ('PURGE_SCHEDULE', '0 0 0 ? * SUN *');
INSERT INTO settings (key_, value) VALUES ('OFFSET_FETCH_INTERVAL', '30');
[warn] play - Run with -DapplyEvolutions.default=true if you want to run them automatically (be careful)
Oops, cannot start the server.
@74e0p173o: Database 'default' needs evolution!
at play.api.db.evolutions.EvolutionsPlugin$$anonfun$onStart$1$$anonfun$apply$1.apply$mcV$sp(Evolutions.scala:484)
at play.api.db.evolutions.EvolutionsPlugin.withLock(Evolutions.scala:507)
at play.api.db.evolutions.EvolutionsPlugin$$anonfun$onStart$1.apply(Evolutions.scala:461)
at play.api.db.evolutions.EvolutionsPlugin$$anonfun$onStart$1.apply(Evolutions.scala:459)
at scala.collection.immutable.List.foreach(List.scala:318)
at play.api.db.evolutions.EvolutionsPlugin.onStart(Evolutions.scala:459)
at play.api.Play$$anonfun$start$1$$anonfun$apply$mcV$sp$1.apply(Play.scala:88)
at play.api.Play$$anonfun$start$1$$anonfun$apply$mcV$sp$1.apply(Play.scala:88)
at scala.collection.immutable.List.foreach(List.scala:318)
at play.api.Play$$anonfun$start$1.apply$mcV$sp(Play.scala:88)
at play.api.Play$$anonfun$start$1.apply(Play.scala:88)
at play.api.Play$$anonfun$start$1.apply(Play.scala:88)
at play.utils.Threads$.withContextClassLoader(Threads.scala:18)
at play.api.Play$.start(Play.scala:87)
at play.core.StaticApplication.<init>(ApplicationProvider.scala:52)
at play.core.server.NettyServer$.createServer(NettyServer.scala:243)
at play.core.server.NettyServer$$anonfun$main$3.apply(NettyServer.scala:279)
at play.core.server.NettyServer$$anonfun$main$3.apply(NettyServer.scala:274)
at scala.Option.map(Option.scala:145)
at play.core.server.NettyServer$.main(NettyServer.scala:274)
at play.core.server.NettyServer.main(NettyServer.scala)
第一次启动时要加个参数:
./kafka-web-console -DapplyEvolutions.default=true
1)Zookeeper菜单
2)Brokers菜单
3)Topics菜单
4)Topics菜单(带消息)
[[email protected] bin]# sh kafka-web-console -DapplyEvolutions.default=true
Play server process ID is 4233
[info] play - database [default] connected at jdbc:h2:file:play
[info] play - Starting application default Akka system.
[info] play - Application started (Prod)
[info] play - Listening for HTTP on /0:0:0:0:0:0:0:0:9000
5)Topic对应的消费组情况
[[email protected] bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --topic test1 --partitions 1 --replication-factor 1
Created topic "test1".
[[email protected] bin]# [2017-06-23 14:34:00,497] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test1,0] (kafka.server.ReplicaFetcherManager)
[2017-06-23 14:34:00,653] INFO Completed load of log test1-0 with log end offset 0 (kafka.log.Log)
[2017-06-23 14:34:00,688] INFO Created log for partition [test1,0] in /tmp/kafka-logs with properties {flush.messages -> 9223372036854775807, segment.bytes -> 1073741824, preallocate -> false, cleanup.policy -> delete, delete.retention.ms -> 86400000, segment.ms -> 604800000, min.insync.replicas -> 1, file.delete.delay.ms -> 60000, retention.ms -> 604800000, max.message.bytes -> 1000012, index.interval.bytes -> 4096, segment.index.bytes -> 10485760, retention.bytes -> -1, segment.jitter.ms -> 0, min.cleanable.dirty.ratio -> 0.5, compression.type -> producer, unclean.leader.election.enable -> true, flush.ms -> 9223372036854775807}. (kafka.log.LogManager)
[2017-06-23 14:34:00,693] INFO Partition [test1,0] on broker 0: No checkpointed highwatermark is found for partition [test1,0] (kafka.cluster.Partition)
[[email protected] bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --topic test2 --partitions 1 --replication-factor 2
Error while executing topic command : replication factor: 2 larger than available brokers: 1
[2017-06-23 14:34:51,064] ERROR kafka.admin.AdminOperationException: replication factor: 2 larger than available brokers: 1
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:77)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:236)
at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:105)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
[[email protected] bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --topic test2 --partitions 4 --replication-factor 1
Created topic "test2".
[[email protected] bin]# [2017-06-23 14:35:04,588] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test2,1],[test2,2],[test2,3],[test2,0] (kafka.server.ReplicaFetcherManager)
[2017-06-23 14:35:04,643] INFO Completed load of log test2-1 with log end offset 0 (kafka.log.Log)
[2017-06-23 14:35:04,681] INFO Created log for partition [test2,1] in /tmp/kafka-logs with properties {flush.messages -> 9223372036854775807, segment.bytes -> 1073741824, preallocate -> false, cleanup.policy -> delete, delete.retention.ms -> 86400000, segment.ms -> 604800000, min.insync.replicas -> 1, file.delete.delay.ms -> 60000, retention.ms -> 604800000, max.message.bytes -> 1000012, index.interval.bytes -> 4096, segment.index.bytes -> 10485760, retention.bytes -> -1, segment.jitter.ms -> 0, min.cleanable.dirty.ratio -> 0.5, compression.type -> producer, unclean.leader.election.enable -> true, flush.ms -> 9223372036854775807}. (kafka.log.LogManager)
[2017-06-23 14:35:04,691] INFO Partition [test2,1] on broker 0: No checkpointed highwatermark is found for partition [test2,1] (kafka.cluster.Partition)
[2017-06-23 14:35:04,824] INFO Completed load of log test2-2 with log end offset 0 (kafka.log.Log)
[2017-06-23 14:35:04,832] INFO Created log for partition [test2,2] in /tmp/kafka-logs with properties {flush.messages -> 9223372036854775807, segment.bytes -> 1073741824, preallocate -> false, cleanup.policy -> delete, delete.retention.ms -> 86400000, segment.ms -> 604800000, min.insync.replicas -> 1, file.delete.delay.ms -> 60000, retention.ms -> 604800000, max.message.bytes -> 1000012, index.interval.bytes -> 4096, segment.index.bytes -> 10485760, retention.bytes -> -1, segment.jitter.ms -> 0, min.cleanable.dirty.ratio -> 0.5, compression.type -> producer, unclean.leader.election.enable -> true, flush.ms -> 9223372036854775807}. (kafka.log.LogManager)
[2017-06-23 14:35:04,834] INFO Partition [test2,2] on broker 0: No checkpointed highwatermark is found for partition [test2,2] (kafka.cluster.Partition)
[2017-06-23 14:35:04,873] INFO Completed load of log test2-3 with log end offset 0 (kafka.log.Log)
[2017-06-23 14:35:04,879] INFO Created log for partition [test2,3] in /tmp/kafka-logs with properties {flush.messages -> 9223372036854775807, segment.bytes -> 1073741824, preallocate -> false, cleanup.policy -> delete, delete.retention.ms -> 86400000, segment.ms -> 604800000, min.insync.replicas -> 1, file.delete.delay.ms -> 60000, retention.ms -> 604800000, max.message.bytes -> 1000012, index.interval.bytes -> 4096, segment.index.bytes -> 10485760, retention.bytes -> -1, segment.jitter.ms -> 0, min.cleanable.dirty.ratio -> 0.5, compression.type -> producer, unclean.leader.election.enable -> true, flush.ms -> 9223372036854775807}. (kafka.log.LogManager)
[2017-06-23 14:35:04,893] INFO Partition [test2,3] on broker 0: No checkpointed highwatermark is found for partition [test2,3] (kafka.cluster.Partition)
[2017-06-23 14:35:04,966] INFO Completed load of log test2-0 with log end offset 0 (kafka.log.Log)
[2017-06-23 14:35:04,974] INFO Created log for partition [test2,0] in /tmp/kafka-logs with properties {flush.messages -> 9223372036854775807, segment.bytes -> 1073741824, preallocate -> false, cleanup.policy -> delete, delete.retention.ms -> 86400000, segment.ms -> 604800000, min.insync.replicas -> 1, file.delete.delay.ms -> 60000, retention.ms -> 604800000, max.message.bytes -> 1000012, index.interval.bytes -> 4096, segment.index.bytes -> 10485760, retention.bytes -> -1, segment.jitter.ms -> 0, min.cleanable.dirty.ratio -> 0.5, compression.type -> producer, unclean.leader.election.enable -> true, flush.ms -> 9223372036854775807}. (kafka.log.LogManager)
[2017-06-23 14:35:04,975] INFO Partition [test2,0] on broker 0: No checkpointed highwatermark is found for partition [test2,0] (kafka.cluster.Partition)
6)Topic对应的消费组列表,可以看出消费和生产的速率
修改http服务端口:
默认是9000端口。
修改conf/application.conf 里的http.port,貌似不起作用。。
可以通过命令行传递参数进去:
./kafka-web-console -Dhttp.port=9001