Kafka安装教程
Kafka集群部署
概述
之前的大数据集群主要是离线处理的方式对集群的数据进行开发处理。当前的集群数据量已经达到了PB级别了,离线数据获取主要是从数仓侧进行全量或者增量的方式导入大数据平台,部分是通过SFTP的方式解析进入大数据平台,少量数据是通过接口的方式准实时接入到大数据平台。随着业务的发展,对于实时数据的接入和应用显得越来越重要了,接下来的时间会一直更新整个时间数据接入和应用的分享。
我们都是知道kafak是一种分布式的,基于发布/订阅的消息系统。以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能、高吞吐率。这个是非常符合当前接入实时数据进行处理的一个组件。
为什么说kafka是非常适合我当前的场景呢?当前市场kafka的应用场景常见有:消息系统、跟踪网站活动、运营指标、日志聚合、流处理、采集日志、提交日志。kafka具有以下的以下优点(特性):
· 高吞吐、低延迟:kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。
· 可扩展性:kafka 集群支持热扩展
· 可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
· 容错性:允许集群中节点失败
· 高并发:支持数千个客户端同时读写
即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。同时支持离线数据处理(hive、HBASE)和实时数据处理(spark、storm)。
话不多说,开启我实际安装过程。如下的安装步骤都是可以在虚拟机里面直接复制使用的。温馨提醒,如果是在实际的生产环境进行部署的话,请记得挂着磁盘。
希望对你们有帮助!!!其中标红的部分,新手要特别注意,大神们就忽略了。也欢迎大家留言指正!
一、集群规划
二、集群环境准备
以下步骤无特殊说明则各节点均需执行
1)关闭防火墙(三台)
# 需要用root用户执行
service iptables stop
chkconfig iptables off
2)关闭selinux(三台)
vim /etc/selinux/config
# 注释下面一行
#SELINUX=enforcing
# 添加下面一行
SELINUX=disabled
3)配置域名映射(三台)
vim /etc/hosts
# 在末尾添加以下几行,注意填写实际IP及hostname
IP01 hostname01 kafka01
IP02 hostname02 kafka02
IP03 hostname03 kafka03
重启机器
reboot -h now
4)配置免密登录
# 生成公钥与私钥, 执行以下命令后按三次回车键即可
ssh-****** -t rsa
所有节点拷贝公钥到kafka01
ssh-copy-id kafka01
复制kafka01认证到其他机器
在kafka01执行
scp /root/.ssh/authorized_keys kafka02:/root/.ssh
scp /root/.ssh/authorized_keys kafka03:/root/.ssh
若中间出错,100%后面不为1212 (404X3,n个节点为404Xn),可以通过机器互ping验证
5)时间同步
在kafka01执行
确认是否安装ntpd服务
rpm -qa | grep ntp
若未安装,在线安装
yum install -y ntp
设置ntpd服务开机启动
chkconfig ntpd on
修改配置文件
vim /etc/ntp.conf
# 添加下面一行,注意修改子网IP,如192.168.11.0
restrict 子网IP mask 255.255.255.0 nomodify notrap
# 注释下面四行
#server 0.centos.pool.ntp.org
#server 1.centos.pool.ntp.org
#server 2.centos.pool.ntp.org
#server 3.centos.pool.ntp.org
# 取消注释或添加下面两行
server 127.127.1.0 # local clock
fudge 127.127.1.0 stratum 10
配置以下内容,保证BIOS与系统时间同步
vim /etc/sysconfig/ntpd
# 添加下面一行
SYNC_HWLOCK=yes
在其他节点执行
配置另外几台与kafka01时间同步
crontab -e
*/1 * * * * /usr/sbin/ntpdate kafka01
6)安装jdk(三台)
查看并卸载自带的openjdk
rpm -qa | grep java
rpm -e java-1.6.0-openjdk-1.6.0.41-1.13.13.1.el6_8.x86_64 tzdata-java-2016j-1.el6.noarch java-1.7.0-openjdk-1.7.0.131-2.6.9.0.el6_8.x86_64 --kafkaps
创建文件夹
# 用于存放安装包
mkdir -p /export/software/
# 用于安装软件
mkdir -p /export/servers/
上传到/export/software/并解压jdk
cd /export/software/
tar -zxvf jdk-8u141-linux-x64.tar.gz -C ../servers/
配置环境变量
vim /etc/profile
# JAVA_HOME
export JAVA_HOME=/export/servers/jdk1.8.0_141
export PATH=:$JAVA_HOME/bin:$PATH
source /etc/profile
验证
java -version
结果显示如下
java version "1.8.0_141"
Java(TM) SE Runtime Environment (build 1.8.0_141-b15)
Java HotSpot(TM) 64-Bit Server VM (build 25.141-b15, mixed mode)
三、Zookeeper集群安装
1)下载安装包
下载完成后上传到集群上
2)解压安装包
cd /export/software/
tar -zxvf zookeeper-3.4.14.tar.gz -C ../servers/
3)创建软链接
cd /export/servers/
ln -s zookeeper-3.4.14 zookeeper
4)修改配置文件
cd /export/servers/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
mkdir -p /export/servers/zookeeper/zkdatas/
vim zoo.cfg
# 指定zk文件存储目录
dataDir=/export/servers/zookeeper/zkdatas
# 取消下面两行注释
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
# 添加下面三行
server.1=kafka01:2888:3888
server.2=kafka02:2888:3888
server.3=kafka03:2888:3888
5)添加myid配置
echo 1 > /export/servers/zookeeper/zkdatas/myid
6)分发安装包
scp -r /export/servers/zookeeper-3.4.14/ kafka02:/export/servers/
scp -r /export/servers/zookeeper-3.4.14/ kafka03:/export/servers/
7)其他机器创建软链接及修改myid值
ln -s /export/servers/zookeeper-3.4.14/ /export/servers/zookeeper
kafka02
echo 2 > /export/servers/zookeeper/zkdatas/myid
kafka03
echo 3 > /export/servers/zookeeper/zkdatas/myid
8)启动zookeeper集群
启动脚本如下
zookeeper-start.sh
#!/bin/bash
echo "1.启动节点kafka01、kafka02和kafka03......"
for n in kafka01 kafka02 kafka03
do
ssh $n "source /etc/profile;/export/servers/zookeeper/bin/zkServer.sh start"
done
#休眠1秒
sleep 1
echo "2.查看集群各节点状态......"
for n in kafa01 kafka02 kafka03
do
ssh $n "source /etc/profile;/export/servers/zookeeper/bin/zkServer.sh status"
done
停止脚本如下
zookeeper-stop.sh
#!/bin/bash
echo "1.停止节点kafka01、kafka02和kafka03......"
for n in kafka01 kafka02 kafka03
do
ssh $n "source /etc/profile;/export/servers/zookeeper/bin/zkServer.sh stop"
done
#休眠1秒
sleep 1
echo "4. 查看集群各节点状态......"
for n in kafka01 kafka02 kafka03
do
ssh $n "source /etc/profile;/export/servers/zookeeper/bin/zkServer.sh status"
done
四、kafka集群安装
1)下载安装包
下载完成后上传到集群上
2)解压安装包
cd /export/software/
tar -zxvf kafka_2.11-2.2.1.tgz -C ../servers/
3)创建软链接
ln -s kafka_2.11-2.2.1/ kafka
4)创建logs文件夹
cd /export/servers/kafka
mkdir logs
5)备份并修改配置文件
cd /export/servers/kafka/config
cp server.properties server.properties.bak
修改内容
vim server.properties
############################# Server Basics #############################
#broker的全局唯一编号,不能重复
broker.id=0
############################# Socket Server Settings #############################
# 监听列表
listeners=PLAINTEXT://kafka01:9092
# 处理网络请求的线程数, 一般等于核心数
num.network.threads=3
# 用来处理磁盘IO的线程数
num.io.threads=8
# socket 发送缓冲区
socket.send.buffer.bytes=102400
# socket 接收缓冲区
socket.receive.buffer.bytes=102400
# socket请求最大数值,防止serverOOM
socket.request.max.bytes=104857600
############################# Log Basics #############################
# kafka运行日志存放的路径
log.dirs=/export/servers/kafka/logs
# 默认分区数 会被命令行参数覆盖
num.partitions=3
# 用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# 内置主题 "__consumer_offsets" and "__transaction_state" 的副本数
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################## Log Retention Policy #############################
# topic数据默认保留时长, 10天
log.retention.hours=240
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# 一个消息长度, 超过再创建一个
log.segment.bytes=1073741824
# 文件大小检查周期
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# 配置连接Zookeeper集群地址
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181
# zookeeper连接超时时间
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
############################# Other #############################
# 数据副本个数
default.replication.factor=3
# 单条消息最大长度4Mb
message.max.bytes=4000000
# broker可复制的消息的最大字节数。值应该比message.max.bytes大
replica.fetch.max.bytes=4194304
# 消费者能读取的最大消息,值应该大于或等于message.max.bytes
max.partition.fetch.bytes=4194304
# 允许删除主题
delete.topic.enable=true
6)各节点分别添加环境变量
vim /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/export/servers/kafka/
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
7)分发安装包
注意:kafka02和kafka03
scp -r /export/servers/kafka_2.11-2.2.1/ kafka02:/export/servers/
scp -r /export/servers/kafka_2.11-2.2.1/ kafka03:/export/servers/
8)其他机器创建软链接及修改配置文件
ln -s /export/servers/kafka_2.11-2.2.1 /export/servers/kafka
kafka02
broker.id=1
listeners=PLAINTEXT://kafka02:9092
kafka03
broker.id=2
listeners=PLAINTEXT://kafka03:9092
9)启动kafka集群
启动脚本如下
kafka-start.sh
#!/bin/bash
echo "正在启动kafka集群......"
for n in kafka01 kafka02 kafka03
do
ssh $n "source /etc/profile;/export/servers/kafka/bin/kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties"
done
echo "启动kafka集群完成......"
停止脚本如下
kafka-stop.sh
#!/bin/bash
echo "正在关闭kafka集群......"
for n in kafka01 kafka02 kafka03
do
ssh $n "source /etc/profile;/export/servers/kafka/bin/kafka-server-stop.sh"
done
echo "关闭kafka集群完成......"
10)kafka测试
# 创建topic
kafka-topics.sh --create --zookeeper kafka01:2181,kafka02:2181,kafka03:2181 --replication-factor 3 --partitions 3 --topic topic-test
# 查看topic详情
kafka-topics.sh --describe --zookeeper kafka01:2181,kafka02:2181,kafka03:2181 localhost:2181 --topic topic-test
# 开启消息生产者
kafka-console-producer.sh --broker-list kafka01:9092,kafka02:9092,kafka03:9092 --topic topic-test
# 开启消息消费者
kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka02:9092,kafka03:9092 --from-beginning --topic topic-test
在消息生产者发送消息消费者能收到即可