Kafka集群部署

一、Kafka下载说明

Kafka中文官网

https://www.orchome.com/5

Apache kafka 官方下载。
https://kafka.apache.org/downloads.html

Kafka集群部署

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz

1、什么是kafka的优势?它主要应用于2大类应用:

  1. 构建实时的流数据管道,可靠地获取系统和应用程序之间的数据。
  2. 构建实时流的应用程序,对数据流进行转换或反应。

2、首先几个概念:

  1. kafka作为一个集群运行在一个或多个服务器上。
  2. kafka集群存储的消息是以topic为类别记录的。
  3. 每个消息(也叫记录record,我习惯叫消息)是由一个key,一个value和时间戳构成。

3、kafka有四个核心API:

  • 应用程序使用 Producer API 发布消息到1个或多个topic(主题)。
  • 应用程序使用 Consumer API 来订阅一个或多个topic,并处理产生的消息。
  • 应用程序使用 Streams API 充当一个流处理器,从1个或多个topic消费输入流,并生产一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。
  • Connector API允许构建或运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,一个关系数据库的连接器可捕获每一个变化。

二、Kafka所使用的基本术语:

1、Topic

Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic).

2、Producer

发布消息的对象称之为主题生产者(Kafka topic producer)

3、Consumer

订阅消息并处理发布的消息的种子的对象称之为主题消费者(consumers)

4、Broker

已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

 

5、小结

新词汇:偏移量(offset)

 

优势:与大多数消息系统比较,kafka有更好的吞吐量,内置分区,副本和故障转移,这有利于处理大规模的消息。

 

使用场景:用户的活动追踪,网站的活动

 

三、安装zookeeper集群

#Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。

1、环境规划

192.168.2.223   ZooKeeper+Kafka

192.168.2.225   ZooKeeper+Kafka

192.168.2.224   ZooKeeper+Kafka

2、同步/etc/hosts文件

192.168.2.223   ZooKeeper-Kafka-01    

192.168.2.225   ZooKeeper-Kafka-02       

192.168.2.224   ZooKeeper-Kafka-03

3、配置文件与说明

在master上操作

wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz

vim /etc/hosts

tar xf zookeeper-3.4.10.tar.gz -C /usr/local/

cd /usr/local/

mv zookeeper-3.4.10/ zookeeper

chown -R root.root zookeeper/

cd zookeeper/conf/

cp zoo_sample.cfg zoo.cfg

vim zoo.cfg

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/tmp/zookeeper

clientPort=2181

server.1=ZooKeeper-Kafka-01:2888:3888

server.2=ZooKeeper-Kafka-02:2888:3888

server.3=ZooKeeper-Kafka-03:2888:3888

#

2888端口号是zookeeper服务之间通信的端口

3888端口是zookeeper与其他应用程序通信的端口。

4、创建工作目录:

mkdir /tmp/zookeeper

touch /tmp/zookeeper/myid

echo 1 > /tmp/zookeeper/myid

5、同步相关配置信息

scp -r /usr/local/zookeeper/ 192.168.2.225:/usr/local/

scp -r /usr/local/zookeeper/ 192.168.2.224:/usr/local/

6、在两个slave节点创建目录和文件

ZooKeeper-Kafka-02节点

mkdir /tmp/zookeeper

touch /tmp/zookeeper/myid

echo 2 > /tmp/zookeeper/myid

 

ZooKeeper-Kafka-03节点

mkdir /tmp/zookeeper

touch /tmp/zookeeper/myid

echo 3 > /tmp/zookeeper/myid

7、安装jdk环境并启动 zookeeper集群

#启动之后查看状态(注意标绿的地方表示各节点身份):

 

ZooKeeper-Kafka-03节点

yum -y install java java-devel

cd /usr/local/zookeeper/bin/

[[email protected] bin]# ./zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[[email protected] bin]# ./zkServer.sh status

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Mode: follower

 

ZooKeeper-Kafka-02节点

yum -y install java java-devel

cd /usr/local/zookeeper/bin/

[[email protected] bin]# ./zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[[email protected] bin]# ./zkServer.sh status

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Mode: Mode: leader

 

ZooKeeper-Kafka-01节点

yum -y install java java-devel

cd /usr/local/zookeeper/bin/

[[email protected] bin]# ./zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[[email protected] bin]# ./zkServer.sh status

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Mode: follower

 

8、报错:

启动报错如下:

Error contacting service. It is probably not running.

执行启动命令后在zookeeper目录下会生成一个zookeeper.out文件,查看内容显示如下:

nohup: 无法运行命令"java": 没有那个文件或目录

原因是zookeeper需要依赖java环境。

解决办法:

yum -y install java java-devel

四、Kafka集群安装配置

1、修改配置文件

#master上操作

cd

tar xf kafka_2.12-2.3.0.tgz -C /usr/local/

cd /usr/local/

mv kafka_2.12-2.3.0/ kafka

cd kafka/config

vim server.properties

cp server.properties server.properties.bak

>server.properties

vim server.properties

# master为0

broker.id=0

listeners=PLAINTEXT://ZooKeeper-Kafka-01:9092

advertised.listeners=PLAINTEXT://ZooKeeper-Kafka-01:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs

num.partitions=5

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=24

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

# 连接

zookeeper.connect=ZooKeeper-Kafka-01:2181,ZooKeeper-Kafka-02:2181,ZooKeeper-Kafka-03:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

# 可删除topic

delete.topic.enable=true

2、同步配置文件

scp -r /usr/local/kafka/ 192.168.2.225:/usr/local/

scp -r /usr/local/kafka/ 192.168.2.224:/usr/local/

3、修改对应的broker.id和listenrs

ZooKeeper-Kafka-02节点

cat server.properties

broker.id=1

listeners=PLAINTEXT://ZooKeeper-Kafka-02:9092

advertised.listeners=PLAINTEXT://ZooKeeper-Kafka-02:9092

 

ZooKeeper-Kafka-03节点

cat server.properties

broker.id=2

listeners=PLAINTEXT://ZooKeeper-Kafka-03:9092

advertised.listeners=PLAINTEXT://ZooKeeper-Kafka-03:9092

4、启动服务

cd /usr/local/kafka

./bin/kafka-server-start.sh config/server.properties &

 

[[email protected] kafka]# jps

16965 QuorumPeerMain

17495 Kafka

18026 Jps

查看所有节点端口:

Kafka集群部署

Kafka集群部署

Kafka集群部署

#只有zookeeper角色是leader才多一个2888端口。

五、Zookeeper+Kafka集群测试

1、topic

创建topic

bin/kafka-topics.sh --create --zookeeper ZooKeeper-Kafka-01:2181, ZooKeeper-Kafka-02:2181, ZooKeeper-Kafka-03:2181 --replication-factor 3 --partitions 3 --topic test
显示topic:

bin/kafka-topics.sh --describe --zookeeper ZooKeeper-Kafka-01:2181, ZooKeeper-Kafka-02:2181, ZooKeeper-Kafka-03:2181 --topic test

Kafka集群部署

列出topic

bin/kafka-topics.sh --list --zookeeper ZooKeeper-Kafka-01:2181, ZooKeeper-Kafka-02:2181, ZooKeeper-Kafka-03:2181

Kafka集群部署

2、producer

创建 producer(生产者);

# 在master节点上 测试生产消息

bin/kafka-console-producer.sh --broker-list ZooKeeper-Kafka-01:9092 -topic test

Kafka集群部署

 

3、consumer

创建 consumer(消费者):

# ZooKeeper-Kafka-02节点上 测试消费

bin/kafka-console-consumer.sh --bootstrap-server ZooKeeper-Kafka-01:9092, ZooKeeper-Kafka-02:9092, ZooKeeper-Kafka-03:9092 -topic test --from-beginning

Kafka集群部署

# ZooKeeper-Kafka-03节点上 测试消费

bin/kafka-console-consumer.sh --bootstrap-server ZooKeeper-Kafka-01:9092, ZooKeeper-Kafka-02:9092, ZooKeeper-Kafka-03:9092 -topic test --from-beginning

Kafka集群部署

# producer 里输入消息,consumer 中就会显示出同样的内容,表示消费成功

4、删除 topic 和关闭服务

bin/kafka-topics.sh --delete --zookeeper ZooKeeper-Kafka-01:2181, ZooKeeper-Kafka-02:2181, ZooKeeper-Kafka-03:2181 --topic test

停止服务:

bin/kafka-server-stop.sh

5、报错小结:

#创建消费者报错:

下面是0.9之前的打开方式

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

如果0.9之后使用这个命令打开会报这样的错误

zookeeper is not a recognized option