分布式消息通信 kafka, kafka与zookeeper进行集成
Kafka是一款分布式消息发布和订阅的系统,具有高性能,高吞吐量的特点而被广泛应用于大数据传输场景,它是由LinkedIn
公司开发,使用 Scale语言编写的,之后成为Apache基金会的一个顶级项目。
kafka介绍网站
http://kafka.apache.org/documentation/#gettingStarted
kafka 是分布式的消息和订阅系统,高性能,高吞吐量,scale 语言进行编写。
内置分区,实现集群,运用在行为跟踪和日志收集这些功能上。
kafka可以使用在系统的流量监控上面,配合elk进行使用。
kafka和activemq的最大不同之处在于:
activemq主动推送消息给到 constomer 消费端,而 kafka 是去主动获得消息的
安装kafka,单机安装:
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.12-1.1.0.tgz
tar -zxvf xxxx.tgz 文件
先安装并且启动 zookeeper的集群,集群的启动操作如下博客:
https://blog.****.net/baidu_24545901/article/details/79924289
对kafka进行集群操作
对 kafka文件进行配置
编辑kafka当中的config目录下的 server.properties文件
编辑另外两台kafka当中的 config目录下的 server.properties,将它们都设置成不同的 broker.id
本篇中设置的三台broker.id与虚拟机ip地址对应关系如下:
虚拟机ip | broker.id |
192.168.159.136 | 0 |
192.168.159.135 | 1 |
192.168.159.137 | 2 |
然后把三台server.properties文件后面找到 listeners=P=PLAINTEXT://192.168.159.136:9092,三台都设置与其ip对应的listeners
然后再把zookeeper.connect 这个属性都更改一下,三台都改成改成下面的
zookeeper.connect=192.168.159.136:2181,192.168.159.135:2181,192.168.159.137:2181
启动 三台机器上的kafka
sh kafka-server-start.sh -daemon ../config/server.properties
启动之后可以在 kafka的 logs目录下查看 logs文件,查看kakfa在启动过程中是否有什么错误信息。
没有错误信息的话,那么可以登录zookeeper
./zkCli.sh -server localhost:2181
然后 ls / 可以查看 zookeeper当中又增加了哪些个节点信息,可以看到增加下下面的几个节点。
cluster、controller、controller_epoch、brokers、isr_change_notification、consumers、latest_producer_id_block、config
节点的作用:
controller:控制节点
brokers-kafka集群的 broker信息 topic
consumer ids/owners/offsets
可以查看 ls /brokers/ids 可以查看到 kafka当中集群配置的节点数目
get /controller 使用这个命令可以查看到当前是哪个节点是集群的主节点(主机器)
查看上面的brokerid 即可显示主节点.
基本操作:
接下来实现 kafka消息通信,实现 192.168.159.136与192.168.159.135之间的消息通信
首先第一步,创建一个topic
sh kafka-topics.sh --create --zookeeper 192.168.159.136:2181 --replication-factor 1 --partitions 1 --topic MyTopic
查看 kafka集群当中有多少个 topic
sh kafka-topics.sh --list --zookeeper localhost:2181 还可以登录zookeeper 查看 ls /brokers/topics 使用这个命令来查看有多少个topic
使用kafka的producer端向kafka集群发送一条消息(kafka的默认端口是 9092):
sh kafka-console-producer.sh --broker-list 192.168.159.136:9092 --topic MyTopic
然后再另外一台机器上执行 kafka的消息接收命令:
sh kafka-console-consumer.sh --bootstrap-server 192.168.159.136:9092 --topic MyTopic --from-beginning
可以看到kafka consumer接收到provider端发送过来的消息了。
kafka的实现细节:
消息
消息有key 和 value;消息批量发送
topic 和 partition
topic是用来存储消息的逻辑概念,
如果说把topic看成是一个数据库的话,那么每个partition则是数据库中的每张表
1000w的表做分表操作,分成10个表 每个表 100w
每个表partition当中存储的数据是以消息的key的hash值去做路由
kafka的高吞吐因素:
1.它是以顺序写的方式进行数据的存储;频繁的io操作,网络io和磁盘io
2.批量发送概念。
3.零拷贝
为什么是零拷贝呢,对比传统的消息传送模型 磁盘-->内存-->用户空间-->内存网卡,这些过程会经过4次上下文切换和数据的复制,频繁io操作浪费性能
而kafka的模型是这样的
日志策略:
日志保留策略
时间和大小
日志压缩策略
topic里面partition的物理存储可以通过下面的命令来观看:
sh kafka-topics.sh --create --zookeeper 192.168.159.136:2181 --replication-factor 1 --partitions 3 --topic aaatopic
这里的 --partitions 3表示的是创建的 三个 partition的节点 --topic表示的是 创建的topic名称是 aaatopic
replication-factor 1这个属性代表的含义是 交叉备份的副本数,支持多台 kafka集群进行相互之间的备份操作,可以使用副本备份增加可靠性,比如说当某台服务器上面的消息丢失了,我们可以有另外一台服务器上面的备份消息存在,保证消息不至于丢失
执行完毕这条命令之后,可以看到 kafka的日志目录 可以在server.properties当中配置 其/tmp/kafka-logs这个目录当中都各有一个节点,节点的名称后面加上了-num,比如 192.168.159.136这台服务器
kafka的可靠性机制:
生产者发送消息到 broker,有三种确认方式 (request.required.acks)
acks=0:producer不会等待broker(leader)发送ack,因为发送消息网络超时或者 broker crash
Leader还没有commit消息 2. Leader与Follower数据不同步,既有可能丢失也有可能会重发
acks=1:当leader接收到消息之后会发送ack,丢失会重发,丢失的概率很小。
acks=-1:当所有的follower都同步消息成功后发送 ack,丢失消息的可能性会比较低。
zookeeper当中的副本机制:
sh kafka-topics.sh --create --zookeeper 192.168.159.136:2181 --replication-factor 3 --partitions 3 --topic aaatopic
使用这句代码创建 kafka topic的时候 副本的时候,利用replication-factor这个属性指定副本数量为3 partitions这个属性表示的是节点的数量是 3,创建出来的副本模型可以参考下面的图:
leader选举:
ISR副本同步队列,维护的是所有的follow节点。
1.副本中的所有的节点必须要和zookeeper中保持连接状态
2.副本的最后一条消息的offset和leader副本的最后一条消息的offset之间的差值不能超过指定的阀值,这个阀值可以设置。
在zookeeper当中 isr的副本消息同步可以在这个节点下看到:
文件存储机制:
在执行了sh kafka-topics.sh --create --zookeeper 192.168.159.136:2181 --replication-factor 3 --partitions 3 --topic aaatopic 这条命令之后,kafka的partition节点会被存储在 设置的日志目录当中,默认是在 /tmp/kafka-logs/ 这个目录下的,我们先创建一个 kafka的topic
sh kafka-topics.sh --create --zookeeper 192.168.159.136:2181 --replication-factor 2 --partitions 3 --topic zzzz
kafka会在一台机器上创建下面的两个节点,
执行 ls /tmp/kafka-logs/zzzz-0 查看其中一个节点的信息
我们可以看到该partition中会存在 一个 .index的文件和另外一个 .log的文件
然后在使用kafka的一个节点去进行消息的发送
sh kafka-console-producer.sh --broker-list 192.168.159.136:9092 --topic zzzz
然后另外一端接收消息:
sh kafka-console-consumer.sh --bootstrap-server 192.168.159.136:9092 --topic MyTopic --from-beginning
执行下面的语句查看 kafka中发送的消息解释:
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/zzzz-0/00000000000000000000.log --print-data-log
看到下面的消息:
.index 和 .log之间的关系就像下面的关系: