Kafka集群安装实施_Kafka2.5.0+Redhat7.5+Zookeeper
一、概述
1.1 Kafka概念描述
1)Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。
2)Kafka最初是由LinkedIn公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
3)Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。
4)无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。
1.2 Kafka集群
Kafka集群由多个Kafka Brokers组成。每个Kafka Broker都有一个唯一的ID(编号)。Kafka Brokers包含主题日志分区,如果希望获得故障处理能力,需要保证至少有三到五个服务器,Kafka集群最大可同时存在10,100或1,000个服务器。
Kafka集群是把状态保存在Zookeeper中的, Zookeeper集群的工作是超过半数才能对外提供服务,最低是三台的配置,3台中超过两台超过半数,允许1台挂掉。
Kafka集群部署也是至少需要3台服务器。此实施文档基于三台服务器来搭建,如图:
资源规划及注意事项
资源项 |
建议 |
注释 |
内存 |
推荐带有64GB RAM的系统,但是32GB的也可以正常使用。少于32GB往往效果不佳。此外,Kafka非常仔细地使用堆空间,不需要将堆大小设置为超过6GB。这将导致32GB的计算机上的文件系统缓存最多28-30GB。 |
32GB或以上 |
CPU |
大多数Kafka部署通常对CPU要求不太严格。这样,处理器的设置比其他资源的重要性要小。但是,如果启用了SSL,则CPU要求可能会更高(确切的详细信息取决于CPU类型和JVM实现)。 最好选择具有多个内核的现代处理器。 如果您需要在更快的CPU或更多核心之间进行选择,请选择更多核心。多核提供的额外并发性将远远超过稍快的时钟速度。 |
多核CPU |
存储 |
建议不要与应用程序日志或其他OS文件系统活动共享用于Kafka数据的相同驱动器,以确保良好的延迟。可以将这些驱动器组合到一个卷(RAID)中,也可以格式化并将每个驱动器安装为自己的目录。由于Kafka具有复制功能,因此RAID提供的冗余也可以在应用程序级别提供。这个选择有几个权衡。 1,如果配置多个数据目录,则代理将在路径中放置一个新分区,该分区中当前存储的分区数最少。每个分区将完全位于数据目录之一中。如果分区之间的数据平衡不佳,则可能导致磁盘之间的负载不平衡。 2,RAID在平衡磁盘之间的负载方面可能会做得更好,因为它可以在较低级别上平衡负载。RAID的主要缺点是减少了可用磁盘空间。RAID的另一个潜在好处是可以容忍磁盘故障。不建议使用RAID5或RAID6,因为会严重影响写入吞吐量,并且在较小程度上会降低磁盘故障时重建阵列的I / O成本(通常,重建成本适用于RAID,但是对于RAID6和RAID 5而言最糟糕)。 3,如果可以接受额外费用,则应使用RAID10。否则,请为您的Kafka服务器配置多个日志目录,每个目录都安装在单独的驱动器上。 4,应该避免使用网络附加存储(NAS)。NAS通常速度较慢,延迟较大,平均延迟偏差较大,并且是单点故障。 5,磁盘吞吐量(IOPS 每秒的读写次数)会影响生产者的性能。因为生产者的消息必须被提交到服务器保存,大多数的客户端都会一直等待,直到至少有一个服务器确认消息已经成功提交为止。也就是说,磁盘写入速度越快,生成消息的延迟就越低。 |
建议单独的存储,大小根据数据量而定,建议500GB, |
网络 |
快速可靠的网络是分布式系统中必不可少的性能组件。低延迟确保节点可以轻松通信,而高带宽则有助于分片移动和恢复。现代数据中心网络(1 GbE,10 GbE)足以满足绝大多数群集的需求。 当然网络吞吐量决定了Kafka能够处理的最大数据流量。它和磁盘是制约Kafka拓展规模的主要因素。对于生产者、消费者写入数据和读取数据都要瓜分网络流量。同时做集群复制也非常消耗网络。 |
建议1GB及以上 |
文件系统 |
Kafka对文件系统没有特别的要求。常规的XFS, ext4, NTFS都可以运行Kafka。 |
Ext4 或者NTFS |
服务器规划
主机名及IP规划
服务器名 |
IP |
注释 |
prdserver1 |
192.168.88.115 |
Zookeeper server1 Kafka server1 |
prdserver2 |
192.168.88.117 |
Zookeeper server2 Kafka server2 |
prdserver3 |
192.168.88.119 |
Zookeeper server3 Kafka server3 |
操作系统没有特别的要求,建议选用Linux较高版本,以减少触发Bug的几率。
操作系统版本 |
Red Hat Enterprise Linux Server release 7.5 (Maipo) |
项目 |
目录 |
权限 |
Zookeeper |
/opt/zookeeper/ |
|
kafka |
/opt/kafka |
|
组 |
用户 |
说明 |
zk |
zk |
zookeeper安装用户组 |
kafka |
kafka |
kafka安装用户组 |
三 安装
Zookeeper集群的安装
由于选择的三台服务器作为zookeeper集群,因此接下来的安装步骤需要同时在三台服务器上执行。
JDK安装及基础环境准备
版本说明
从安全角度考虑,建议使用JDK 1.8的最新发行版,因为较早的免费版本已披露了安全漏洞。LinkedIn当前正在使用G1收集器运行JDK 1.8 u5(希望升级到较新版本)。LinkedIn的调整如下所示:
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
JDK下载及安装
jdk下载地址:
https://www.oracle.com/java/technologies/javase-downloads.html
#mkdir -p /usr/java
#mv /tmp/jdk-8u241-linux-x64.tar /usr/java
#tar -zxvf jdk-8u241-linux-x64.tar
#cd /usr/java/jdk1.8.0_241
#ls -l
total 25988
drwxr-xr-x. 2 10143 10143 4096 Dec 11 18:35 bin
-r--r--r--. 1 10143 10143 3244 Dec 11 18:35 COPYRIGHT
drwxr-xr-x. 3 10143 10143 132 Dec 11 18:35 include
-rw-r--r--. 1 10143 10143 5217333 Dec 11 15:41 javafx-src.zip
drwxr-xr-x. 5 10143 10143 185 Dec 11 18:35 jre
drwxr-xr-x. 5 10143 10143 245 Dec 11 18:35 lib
-r--r--r--. 1 10143 10143 44 Dec 11 18:35 LICENSE
drwxr-xr-x. 4 10143 10143 47 Dec 11 18:35 man
-r--r--r--. 1 10143 10143 159 Dec 11 18:35 README.html
-rw-r--r--. 1 10143 10143 424 Dec 11 18:35 release
-rw-r--r--. 1 10143 10143 21078837 Dec 11 18:35 src.zip
-rw-r--r--. 1 10143 10143 116400 Dec 11 15:41 THIRDPARTYLICENSEREADME-JAVAFX.txt
-r--r--r--. 1 10143 10143 169788 Dec 11 18:35 THIRDPARTYLICENSEREADME.txt
JDK环境变量配置
jdk部署目录 |
shell |
说明 |
/usr/java/jdk1.8.0_241 |
vi /etc/profile 末尾添加 JAVA_HOME=/usr/java/jdk1.8.0_241 CLASSPATH=.:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar PATH=$PATH:${JAVA_HOME}/bin export JAVA_HOME CLASSPATH PATH 保存退出 source /etc/profile |
若客户无特别说明,则jdk部署在/usr/java下(若无目录请创建)。 Jdk配置也可单独部署在zookeeper及kafka实施用户下,修改<user_home>/.bash_profile添加相同内容即可。 |
查看JDK版本
# java -version java version "1.8.0_241" Java(TM) SE Runtime Environment (build 1.8.0_241-b07) Java HotSpot(TM) 64-Bit Server VM (build 25.241-b07, mixed mode) |
添加用户和组
# groupadd zk # useradd -g zk zk # passwd zk |
文件路径创建
Run as root:
#chmod 777 /opt
#su – zk
#cd /opt
下载Zookeeper
下载软件
https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
上传解压
将apache-zookeeper-3.6.1-bin.tar.gz上传到服务器/opt/software
#tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
#mv apache-zookeeper-3.6.1-bin /opt/zookeeper/zookeeper-3.6.1
#cd /opt/zookeeper/zookeeper-3.6.1/conf
#ll
total 12
-rw-r--r--. 1 zk zk 535 Apr 21 22:59 configuration.xsl
-rw-r--r--. 1 zk zk 3435 Apr 21 22:59 log4j.properties
-rw-r--r--. 1 zk zk 1148 Apr 21 22:59 zoo_sample.cfg
#zoo_sample.cfg 这个文件是官方给我们的zookeeper的样板文件,复制一份命名为zoo.cfg,zoo.cfg是官方指定的文件命名规则。
#cp zoo_sample.cfg zoo.cfg
修改zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zookeeper/zkdata
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
dataLogDir=/opt/zookeeper/zkdatalog
server.1=192.168.88.115:2888:3888
server.2=192.168.88.117:2888:3888
server.3=192.168.88.119:2888:3888
#The number of snapshots to retain in dataDir
autopurge.snapRetainCount=20
#Purge task interval in hours
#Set to "0" to disale auto purge feature
autopurge.purgeInterval=48
注释:
#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
#192.168.88.115为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888。
创建myid文件
#mkdir -p /opt/zookeeper/zkdata/
#mkdir -p /opt/zookeeper/zkdatalog
创建myid文件, 三台服务器分别创建
#prdserver1
#echo "1" > /opt/zookeeper/zkdata/myid
#prdserver2
#echo "2" > /opt/zookeeper/zkdata/myid
#prdserver3
echo "3" > /opt/zookeeper/zkdata/myid
日志文件配置
#cd /opt/zookeeper/zookeeper-3.6.1
#mkdir logs
#vi /opt/zookeeper/zookeeper-3.6.1/conf/log4j.properties
zookeeper.root.logger=INFO, CONSOLE
zookeeper.console.threshold=INFO
zookeeper.log.dir=/opt/zookeeper/zookeeper-3.6.1/logs
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=INFO
zookeeper.log.maxfilesize=256MB
zookeeper.log.maxbackupindex=20
zookeeper.tracelog.dir=${zookeeper.log.dir}
zookeeper.tracelog.file=zookeeper_trace.log
……
启动服务并查看状态
#进入到Zookeeper的bin目录下
cd /opt/zookeeper/zookeeper-3.6.1/bin
#启动服务(3台都需要操作)
./zkServer.sh start
----------------------------------------------------------
[[email protected] bin]$ ./zkServer.sh start
/usr/local/java/jdk1.8/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.6.1/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[[email protected] bin]$ ./zkServer.sh status
/usr/local/java/jdk1.8/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.6.1/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
[[email protected] bin]$ ./zkServer.sh status
/usr/local/java/jdk1.8/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.6.1/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
[[email protected] bin]$ ./zkServer.sh status
/usr/local/java/jdk1.8/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.6.1/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
查看zookeeper进程
[[email protected] bin]$ ps -ef |grep zookeeper
zk 2216 1 0 15:39 pts/0 00:00:03 java -Dzookeeper.log.dir=/opt/zookeeper/zookeeper-3.6.1/bin/../logs -Dzookeeper.log.file=zookeeper-zk-server-prdserver1.log -Dzookeeper.root.logger=INFO,CONSOLE -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError=kill -9 %p -cp /opt/zookeeper/zookeeper-3.6.1/bin/../zookeeper-server/target/classes:/opt/zookeeper/zookeeper-3.6.1/bin/../build/classes:/opt/zookeeper/zookeeper-3.6.1/bin/../zookeeper-server/target/lib/*.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../build/lib/*.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/zookeeper-prometheus-metrics-3.6.1.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/zookeeper-jute-3.6.1.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/zookeeper-3.6.1.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/snappy-java-1.1.7.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/slf4j-log4j12-1.7.25.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/slf4j-api-1.7.25.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/simplec
至此zookeeper安装完成。
Kafka集群安装
JDK安装
用户和组
组 |
用户 |
说明 |
kafka |
kafka |
域用户组或应用用户组 |
# groupadd kafka # useradd -g kafka kafka # passwd kafka |
Run as root:
su - kafka
cd /opt
mkdir kafka
mkdir -p /opt/kafka/kafkalogs/
下载Kafka
#下载软件
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.0/kafka_2.12-2.5.0.tgz
#上传解压
将kafka_2.12-2.5.0.tgz上传到服务器/opt/software
[[email protected] software]$ tar -zxvf kafka_2.12-2.5.0.tgz
[[email protected] software]$mv kafka_2.12-2.5.0 /opt/kafka/kafka2.5.0
主要关注server.properties 这个文件即可,我们可以发现在config目录下有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的Zookeeper集群来启动,但是此文档我们使用第四章独立安装的Zookeeper集群。
参数说明:
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.115 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.88.115:2181,192.168.7.117:2181,192.168.7.119:2181 #设置zookeeper的连接端口
以节点192.168.88.115为例, 进入到config目录,
cd /opt/kafka/kafka2.5.0/config
vi server.properties
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0 (注:192.168.88.117的设置为1,192.168.88.119设置为2)
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.88.115:9092 (注:其它两台ip做对应修改)
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.88.115:9092 (注:其它两台ip做对应修改)
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
#log.dirs=/tmp/kafka-logs
log.dirs=/opt/kafka/kafkalogs/
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper.connect=localhost:2181
zookeeper.connect=192.168.88.115:2181,192.168.7.117:2181,192.168.7.119:2181
修改生产者配置文件
vi consumer.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
#bootstrap.servers=localhost:9092
bootstrap.servers=192.168.88.115:9092,192.168.88.117:9092,192.168.88.119:9092
# consumer group id
group.id=test-consumer-group
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=
修改消费者配置文件
[[email protected] config]$ vi producer.properties
……
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
#bootstrap.servers=localhost:9092
bootstrap.servers=192.168.88.115:9092,192.168.88.117:9092,192.168.88.119:9092
……
启动Kafka集群并测试
启动Kafka
从后台启动Kafka集群(3台都需要启动)
进入到kafka的bin目录, 运行启动命令。
[[email protected] bin]$cd/opt/kafka/kafka2.5.0/bin
[[email protected] bin]$ ./kafka-server-start.sh -daemon ../config/server.properties
[[email protected] bin]$ ps -ef |grep kafka
kafka 5218 1 8 17:48 pts/1 00:00:02 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/opt/kafka/kafka2.5.0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/kafka/kafka2.5.0/bin/../logs -Dlog4j.configuration=file:./../config/log4j.properties -cp .:/usr/local/java/jdk1.8/lib:/usr/local/java/jdk1.8/jre/lib:/opt/kafka/kafka2.5.0/bin/../libs/activation-1.1.1.jar:/opt/kafka/kafka2.5.0/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/kafka2.5.0/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/kafka2.5.0/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/kafka2.5.0/bin/../libs/commons-cli-1.4.jar:/opt/kafka/kafka2.5.0/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/kafka2.5.0/bin/../libs/connect-api-2.5.0.jar:/opt/kafka/kafka2.5.0/bin/../libs/connect-basic-auth-extension-2.5.0.jar:/opt/kafka/kafka2.5.0/bin/../libs/connect-file-2.5.0.
Kafka日志查看
[[email protected] logs]$ cd /opt/kafka/kafka2.5.0/logs
[[email protected] logs]$ tail -f server.log
[2020-07-29 18:04:33,020] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-07-29 18:04:33,027] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-07-29 18:04:33,051] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2020-07-29 18:04:33,094] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-07-29 18:04:33,151] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2020-07-29 18:04:33,170] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2020-07-29 18:04:33,175] INFO Kafka version: 2.5.0 (org.apache.kafka.common.utils.AppInfoParser)
[2020-07-29 18:04:33,175] INFO Kafka commitId: 66563e712b0b9f84 (org.apache.kafka.common.utils.AppInfoParser)
[2020-07-29 18:04:33,175] INFO Kafka startTimeMs: 1596017073171 (org.apache.kafka.common.utils.AppInfoParser)
[2020-07-29 18:04:33,177] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
测试Kafka集群消息生产与消费
创建Topic(First_kafka_project)。
[[email protected] bin]$ pwd
/opt/kafka/kafka2.5.0/bin
[[email protected] bin]$ ./kafka-topics.sh --create --zookeeper 192.168.88.115:2181 --replication-factor 2 --partitions 3 --topic First_kafka_project
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic First_kafka_project.
#解释
--replication-factor 2 #复制两份
--partitions 3 #创建3个分区
--topic #主题为First_kafka_project
在192.168.88.117上创建一个发布者,并发送一些测试消息。
[[email protected] bin]$ ./kafka-console-producer.sh --broker-list 192.168.88.117:9092 --topic First_kafka_project
>test
>test
>myfirst project
>today is Tuesday!
>end of the message
>
在192.168.88.119创建一个订阅者,并查看是否订阅到消息。
[[email protected] bin]$ ./kafka-console-consumer.sh --bootstrap-server 192.168.88.119:9092 --topic First_kafka_project --from-beginning
test
test
myfirst project
today is Tuesday!
today is Tuesday!
end of the message
在192.168.88.119成功接收到消息。说明消息创建和消费测试成功。
Kafka集群高可用测试
创建一个topic(Second_kafka_project)
查看具体信息
例如:查看Second_kafka_project topic状态
[[email protected] bin]$ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic Second_kafka_project
Topic: Second_kafka_project PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: Second_kafka_project Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: Second_kafka_project Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: Second_kafka_project Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
从上面的信息,我们知道分区Partition: 0的leader在broker.id=1这个节点上,副本在broker.id为1 2 0这个三个几点,并且所有副本都存活,并跟broker.id=1这个节点同步。
现在kill掉broker.id=1(192.168.88.117)这个节点上的kafka进程。期望的结果是Leard及replicas都不会在broker.id=1这个节点上了。
[[email protected] bin]$ ps -ef |grep kafka
kafka 5136 1 2 18:04 pts/0 00:02:04 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/opt/kafka/kafka2.5.0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:Numb
[[email protected] bin]$ kill -9 5136
[[email protected] bin]$ ps -ef |grep 5136
kafka 9976 3429 0 19:22 pts/0 00:00:00 grep --color=auto 5136
[[email protected] bin]$
再查看topic状态,节点1已经没有任何leader和副本。
[[email protected] bin]$ ./kafka-topics.sh --describe --zookeeper 192.168.88.117:2181 --topic Second_kafka_project
Topic: Second_kafka_project PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: Second_kafka_project Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
Topic: Second_kafka_project Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0
Topic: Second_kafka_project Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,2
再测试消息能否正常产生及消费
结果如下。消息能正常产生及消费。此处消费者有一个短暂的切换过程。
[[email protected] bin]$ ./kafka-console-producer.sh --broker-list 192.168.88.117:9092 --topic Second_kafka_project
>test second topic!
>before kill 192.168.88.117....
>after kill 192.168.88.117....
>Test send message again after kill broker1 1 minutes later
>
[[email protected] bin]$ ./kafka-console-consumer.sh --bootstrap-server 192.168.88.119:9092 --topic Second_kafka_project --from-beginning
test second topic!
before kill 192.168.88.117....
[2020-07-29 19:21:21,057] WARN [Consumer clientId=consumer-console-consumer-15949-1, groupId=console-consumer-15949] Connection to node 1 (/192.168.88.117:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
…..
[2020-07-29 19:21:34,428] WARN [Consumer clientId=consumer-console-consumer-15949-1, groupId=console-consumer-15949] Connection to node 1 (/192.168.88.117:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
after kill 192.168.88.117....
Test send message again after kill broker1 1 minutes later
集群安装完成后可以根据具体的项目需求做相应参数的优化。