消息队列——kafka——搭建集群(linux)

1、集群规划:

应用

192.168.1.142

192.168.1.143

192.168.1.145

Jdk8

Jdk8

Jdk8

Jdk8

Zookeeper3.4.10

Master

Slave1

Slave2

kafka2.12

Kafka2.12

kafka2.12

kafka2.12

Kafka-manager2.0

Kafka-manager

2、集群各节点配置jdk环境:

下载1.8及以上版本的jdk,在集群各个节点上配置好jdk环境变量。

3、集群各节点关闭防火墙:

关闭集群各个节点的防火墙,强烈建议如此,如果不能关闭防火墙,那么需要将zookeeper集群节点通讯端口、kafka集群访问端口等端口信息逐个开通。本次操作直接关闭防火墙。

4、安装zookeeper集群:

以下操作在zookeeper集群各节点上对应执行。

(1)集群参数规划:

参数名

192.168.1.142

192.168.1.143

192.168.1.144

dataDir

/data/soft/zookeeper/data

/data/soft/zookeeper/data

/data/soft/zookeeper/data

clientPort

2181

2181

2181

server.A

server.0=192.168.1.142:2888:3888

server.1=192.168.1.143:2888:3888

server.2=192.168.1.144:2888:3888

server.0=192.168.1.142:2888:3888

server.1=192.168.1.143:2888:3888

server.2=192.168.1.144:2888:3888

server.0=192.168.1.142:2888:3888

server.1=192.168.1.143:2888:3888

server.2=192.168.1.144:2888:3888

文件:myid

0

1

2

(2)软件下载:

下载地址:https://zookeeper.apache.org/releases.html#download

消息队列——kafka——搭建集群(linux)

(3)解压软件包:

将下载的软件包放到安装路径下进行解压。

(4)配置文件重命名:

将解压后的文件夹中config内zoo_sample.cfg重命名成zoo.cfg

(5)修改配置参数:

A、数据存储路径:

参数:dataDir,配置为zookeeper数据存储路径。

B、服务端口号:

参数:clientPort,值为连接zookeeper服务的端口号。

C、集群各节点通信地址:

参数:server.A=B:C:D,值为zookeeper集群各节点通讯地址。

ABCD意义如下:

A:其中 A 是一个数字,表示这个是服务器的编号,与myid文件保持一致;

B:是这个服务器的 ip 地址;

C:Leader选举的端口;

D:Zookeeper服务器之间的通信端口。

例如:

server.0=192.168.1.142:2888:3888

server.1=192.168.1.143:2888:3888

server.2=192.168.1.144:2888:3888

(6)创建myid文件:

在配置的数据存储路径参数dataDir路径下创建myid文件,文件中写上该zookeeper集群节点的编号,编号就是配置的server.A中的A值。

(7)配置环境变量:

配置环境变量ZOOKEEPER_HOME,值为zookeeper软件解压路径。然后在path中添加%ZOOKEEPER_HOME%/bin;

(8)启动zookeeper集群:

将zookeeper集群各节点逐个进行启动,各节点执行命令:

./zkServer.sh start;

(9)停止zookeeper集群:

将zookeeper集群各节点逐个进行停止,各节点执行命令:

./zkServer.sh stop;

(10)查看zookeeper集群:

将zookeeper集群各节点逐个进行查看,各节点执行命令:

./zkServer.sh status;

5、安装kafka集群:

以下操作在kafka集群各节点上对应执行。

(1)集群参数规划:

参数名

192.168.1.142

192.168.1.143

192.168.1.144

broker.id

0

1

2

listeners

PLAINTEXT://192.168.1.142:9092

PLAINTEXT://192.168.1.143:9092

PLAINTEXT://192.168.1.144:9092

log.dirs

/data/soft/kafka/data

/data/soft/kafka/data

/data/soft/kafka/data

zookeeper.connect

192.168.1.142:2181,

192.168.1.143:2181,

192.168.1.144:2181

192.168.1.142:2181,

192.168.1.143:2181,

192.168.1.144:2181

192.168.1.142:2181,

192.168.1.143:2181,

192.168.1.144:2181

(2)软件下载:

下载地址:http://kafka.apache.org/downloads

消息队列——kafka——搭建集群(linux)

(3)解压软件包:

将下载的软件包放置到安装路径下并进行解压。

(4)修改配置参数:

A、唯一标识符:

参数:broker.id,值为一个integer类型数字,集群内各节点此参数保证唯一。

B、监听地址:

参数:listeners,值为kafka访问地址。

例如:listeners=PLAINTEXT://192.168.1.140:9092(修改IP和端口号)

C、日志存储路径:

参数:log.dirs,值为日志存储路径。

D、Zookeeper访问地址:

参数:zookeeper.connect,值为zookeeper集群访问地址,集群各节点访问地址中间用英文逗号隔开。

(5)启动kafka集群:

将kafka集群各节点逐个进行启动,各节点执行命令:

进入kafka安装路径下bin中,执行命令:

./kafka-server-start.sh ../config/server.properties &

(6)停止kafka集群:

将kafka集群各节点逐个进行停止,各节点执行命令:

进入kafka安装路径下bin中,执行命令:

./kafka-server-stop.sh

6、测试kafka集群:

(1)创建一个topic:

进入kafka安装路径下bin中,执行命令:

./kafka-topics.sh --create --bootstrap-server 192.168.1.142:9092,192.168.1.143:9092,192.168.1.144:9092 --replication-factor 3 --partitions 3 --topic test

非常注意:Kafka 从 2.2 版本开始将 kafka-topic.sh 脚本中的 −−zookeeper 参数标注为 “过时”,推荐使用 −−bootstrap-server 参数。若读者依旧使用的是 2.1 及以下版本,请将下述的 --bootstrap-server 参数及其值手动替换为 --zookeeper zk1:2181,zk2:2181,zk:2181。一定要注意两者参数值所指向的集群地址是不同的。−−bootstrap-server参数指向kafka单机或集群,而--zookeeper参数指向zookeeper单机或集群。

(2)打开一个producer:

进入kafka安装路径下bin中,执行命令:

./kafka-console-producer.sh --broker-list 192.168.1.142:9092,192.168.1.143:9092,192.168.1.144:9092 --topic test

(3)打开一个consumer:

进入kafka安装路径下bin中,执行命令:

./kafka-console-consumer.sh--bootstrap-server 192.168.1.142:9092,192.168.1.143:9092,192.168.1.144:9092 --topic

(4)测试kafka集群消息:

然后在producer窗口写内容,就会在该topic中产生一条消息,在consumer窗口中看到该topic中新增加的消息,就算是消费了该topic中的一条消息。

7、测试集群高可用:

(1)测试zookeeper集群高可用:

将zookeeper集群中某个节点手动关闭掉,然后在kafka的生产端可以正常输入消息,在kafka消费端可以正常看到消息。

(2)测试kafka集群高可用:

将kafka集群中某个节点手动关闭掉,然后在kafka的生产端可以正常输入消息,在kafka消费端可以正常看到消息。