kafka集群搭建
kafka有自带的zookeeper,但是使用上容易遇到一些问题,还是推荐大家单独搭建zookeeper
官方文档:http://kafka.apache.org/documentation/
一、环境说明
准备三台集群机器
172.16.10.11 (kafkabrokerid:0 zookeeperid:1)
172.16.10.12 (kafkabrokerid:1 zookeeperid:2)
172.16.10.13 (kafkabrokerid:2 zookeeperid:3)
二、安装zookeeper
首先在172.16.10.11上进行操作,安装在/opt/ 目录下
1.下载:
cd /opt
wget "http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz"
2.解压
tar -zxvf zookeeper-3.4.13.tar.gz
3.重命名(纯粹是为了使用方便,不做这一步也行)
mv zookeeper-3.4.13 zookeeper
4.建立zookeeper配置文件:
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
5.编辑配置文件
cd /opt/zookeeper/conf
vi zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/logs/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=172.16.10.11:2888:3888
server.2=127.16.10.12:2888:3888
server.3=172.16.10.13:2888:3888
对于配置文件的说明:
tickTime:表示 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔。
dataDir:Zookeeper 保存数据的目录,需要注意,需要在这个目录下面创建myid文件,文件内容为zookeeper当前服务器的id,例如我172.16.10.11 (kafkabrokerid:0 zookeeperid:1) 这台机器,myid的文件内容就是1。
clientPort:zookeeper启动后的监听端口号。
initLimit:这个配置项是用来配置 Zookeeper 接受客户端 初始化连接时最长能忍受多少个心跳时间间隔数。
syncLimit:这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度。
server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是如果集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。配置前需要确认该项配置涉及的端口是否被占用。
6.启动zookeeper
sh /opt/zookeeper/bin/zkServer.sh start
7.查看zookeeper启动日志
tail -f /opt/zookeeper/bin/zookeeper.out
8.检查zookeeper端口监听状态
netstat -anp | grep 2181
如果端口正常监听且启动日志中无报错,则zookeeper 启动成功。
另外两台机器按照上面的流程安装即可,需要注意的是dataDir下的myid文件,如果自己希望当前这个服务器在集群中的id是2, 则需要把文件内容修改为2
三、安装kafka
1.下载:
同样安装在/opt 下面
cd /opt
wget "http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz"
2.解压
tar -zxvf kafka_2.11-2.2.0.tgz
3.重命名
mv kafka_2.11-2.2.0 kafka
4.修改配置文件
vi /opt/kafka/config/server.properties
5.配置文件详情
注释和空行我都去除了,留下的都是有用的配置项
broker.id=0
listeners = PLAINTEXT://172.16.10.11:9092
advertised.listeners=PLAINTEXT://172.16.10.11:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/logs/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=12
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.10.11:2181,172.16.10.12:2181,172.16.10.13:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
挑一些可能需要修改的配置项做说明:
brokerid:kafka在集群中的id标识,各个服务器不可以重复
listeners:指定这台服务器的kafka监听地址
advertised.listeners:配置该项,使得kafka在zookeeper上注册的ip是外网ip,可被外网访问,而非仅仅是局域网ip。
log.dirs:kafka的日志/数据存储位置
num.partitions:每个topic分配的partition数量
log.retention.hours:日志保留时间
zookeeper.connect : zookeeper集群地址
其他的配置项保持默认即可。
6.启动kafka
cd /opt/kafka/bin
nohup ./kafka-server-start.sh ../config/server.properties &
查看nohup.out 日志确认kafka是否正常启动
7.端口检查
netstat -anp | grep 9092
8.创建topic
cd /opt/kafka/bin/
./kafka-topics.sh --create --zookeeper 172.16.10.11:2181 --replication-factor 3 --partitions 1 --topic test-topic
9.生产者发送消息
cd /opt/kafka/bin
./kafka-console-producer.sh --broker-list 172.16.10.11:9092 --topic test-topic
命令执行之后是这样的界面,可以发送消息:
10.消费者消费消息
cd /opt/kafka/bin
./kafka-console-consumer.sh --bootstrap-server 172.16.10.11:9092 --topic test-topic --from-beginning
可以看到收到的消息:
其他两台机器按照以上流程搭建即可,server.properties里面的broker id记得修改
至此集群搭建完成,有疑问请及时留言。