消息队列——kafka——搭建集群(linux)
1、集群规划:
应用 |
192.168.1.142 |
192.168.1.143 |
192.168.1.145 |
Jdk8 |
Jdk8 |
Jdk8 |
Jdk8 |
Zookeeper3.4.10 |
Master |
Slave1 |
Slave2 |
kafka2.12 |
Kafka2.12 |
kafka2.12 |
kafka2.12 |
Kafka-manager2.0 |
Kafka-manager |
无 |
无 |
2、集群各节点配置jdk环境:
下载1.8及以上版本的jdk,在集群各个节点上配置好jdk环境变量。
3、集群各节点关闭防火墙:
关闭集群各个节点的防火墙,强烈建议如此,如果不能关闭防火墙,那么需要将zookeeper集群节点通讯端口、kafka集群访问端口等端口信息逐个开通。本次操作直接关闭防火墙。
4、安装zookeeper集群:
以下操作在zookeeper集群各节点上对应执行。
(1)集群参数规划:
参数名 |
192.168.1.142 |
192.168.1.143 |
192.168.1.144 |
dataDir |
/data/soft/zookeeper/data |
/data/soft/zookeeper/data |
/data/soft/zookeeper/data |
clientPort |
2181 |
2181 |
2181 |
server.A |
server.0=192.168.1.142:2888:3888 server.1=192.168.1.143:2888:3888 server.2=192.168.1.144:2888:3888 |
server.0=192.168.1.142:2888:3888 server.1=192.168.1.143:2888:3888 server.2=192.168.1.144:2888:3888 |
server.0=192.168.1.142:2888:3888 server.1=192.168.1.143:2888:3888 server.2=192.168.1.144:2888:3888 |
文件:myid |
0 |
1 |
2 |
(2)软件下载:
下载地址:https://zookeeper.apache.org/releases.html#download
(3)解压软件包:
将下载的软件包放到安装路径下进行解压。
(4)配置文件重命名:
将解压后的文件夹中config内zoo_sample.cfg重命名成zoo.cfg
(5)修改配置参数:
A、数据存储路径:
参数:dataDir,配置为zookeeper数据存储路径。
B、服务端口号:
参数:clientPort,值为连接zookeeper服务的端口号。
C、集群各节点通信地址:
参数:server.A=B:C:D,值为zookeeper集群各节点通讯地址。
ABCD意义如下:
A:其中 A 是一个数字,表示这个是服务器的编号,与myid文件保持一致;
B:是这个服务器的 ip 地址;
C:Leader选举的端口;
D:Zookeeper服务器之间的通信端口。
例如:
server.0=192.168.1.142:2888:3888
server.1=192.168.1.143:2888:3888
server.2=192.168.1.144:2888:3888
(6)创建myid文件:
在配置的数据存储路径参数dataDir路径下创建myid文件,文件中写上该zookeeper集群节点的编号,编号就是配置的server.A中的A值。
(7)配置环境变量:
配置环境变量ZOOKEEPER_HOME,值为zookeeper软件解压路径。然后在path中添加%ZOOKEEPER_HOME%/bin;
(8)启动zookeeper集群:
将zookeeper集群各节点逐个进行启动,各节点执行命令:
./zkServer.sh start;
(9)停止zookeeper集群:
将zookeeper集群各节点逐个进行停止,各节点执行命令:
./zkServer.sh stop;
(10)查看zookeeper集群:
将zookeeper集群各节点逐个进行查看,各节点执行命令:
./zkServer.sh status;
5、安装kafka集群:
以下操作在kafka集群各节点上对应执行。
(1)集群参数规划:
参数名 |
192.168.1.142 |
192.168.1.143 |
192.168.1.144 |
broker.id |
0 |
1 |
2 |
listeners |
PLAINTEXT://192.168.1.142:9092 |
PLAINTEXT://192.168.1.143:9092 |
PLAINTEXT://192.168.1.144:9092 |
log.dirs |
/data/soft/kafka/data |
/data/soft/kafka/data |
/data/soft/kafka/data |
zookeeper.connect |
192.168.1.142:2181, 192.168.1.143:2181, 192.168.1.144:2181 |
192.168.1.142:2181, 192.168.1.143:2181, 192.168.1.144:2181 |
192.168.1.142:2181, 192.168.1.143:2181, 192.168.1.144:2181 |
(2)软件下载:
下载地址:http://kafka.apache.org/downloads
(3)解压软件包:
将下载的软件包放置到安装路径下并进行解压。
(4)修改配置参数:
A、唯一标识符:
参数:broker.id,值为一个integer类型数字,集群内各节点此参数保证唯一。
B、监听地址:
参数:listeners,值为kafka访问地址。
例如:listeners=PLAINTEXT://192.168.1.140:9092(修改IP和端口号)
C、日志存储路径:
参数:log.dirs,值为日志存储路径。
D、Zookeeper访问地址:
参数:zookeeper.connect,值为zookeeper集群访问地址,集群各节点访问地址中间用英文逗号隔开。
(5)启动kafka集群:
将kafka集群各节点逐个进行启动,各节点执行命令:
进入kafka安装路径下bin中,执行命令:
./kafka-server-start.sh ../config/server.properties &
(6)停止kafka集群:
将kafka集群各节点逐个进行停止,各节点执行命令:
进入kafka安装路径下bin中,执行命令:
./kafka-server-stop.sh
6、测试kafka集群:
(1)创建一个topic:
进入kafka安装路径下bin中,执行命令:
./kafka-topics.sh --create --bootstrap-server 192.168.1.142:9092,192.168.1.143:9092,192.168.1.144:9092 --replication-factor 3 --partitions 3 --topic test
非常注意:Kafka 从 2.2 版本开始将 kafka-topic.sh 脚本中的 −−zookeeper 参数标注为 “过时”,推荐使用 −−bootstrap-server 参数。若读者依旧使用的是 2.1 及以下版本,请将下述的 --bootstrap-server 参数及其值手动替换为 --zookeeper zk1:2181,zk2:2181,zk:2181。一定要注意两者参数值所指向的集群地址是不同的。−−bootstrap-server参数指向kafka单机或集群,而--zookeeper参数指向zookeeper单机或集群。
(2)打开一个producer:
进入kafka安装路径下bin中,执行命令:
./kafka-console-producer.sh --broker-list 192.168.1.142:9092,192.168.1.143:9092,192.168.1.144:9092 --topic test
(3)打开一个consumer:
进入kafka安装路径下bin中,执行命令:
./kafka-console-consumer.sh--bootstrap-server 192.168.1.142:9092,192.168.1.143:9092,192.168.1.144:9092 --topic
(4)测试kafka集群消息:
然后在producer窗口写内容,就会在该topic中产生一条消息,在consumer窗口中看到该topic中新增加的消息,就算是消费了该topic中的一条消息。
7、测试集群高可用:
(1)测试zookeeper集群高可用:
将zookeeper集群中某个节点手动关闭掉,然后在kafka的生产端可以正常输入消息,在kafka消费端可以正常看到消息。
(2)测试kafka集群高可用:
将kafka集群中某个节点手动关闭掉,然后在kafka的生产端可以正常输入消息,在kafka消费端可以正常看到消息。