kafka总结
kafka充当消息队列
scala 2.11.X
*kafka的架构
zookeeper:集群协调管理配置工具。kafka的集群的形成和集群的基本配置信息,以及kafka的元数据信息,都保存在zookeeper中。kafka的broker连接信息也保存在zookeeper中。
Broker:kafka的每一个服务节点都被称作broker,kafka集群就是由若干个broker构成的一个服务整体。
broker节点是一个逻辑上的概念(一个broker就是指一台机器上的一个进程),不是物理上的节点。
Topic:数据的划分类别,如果把kafka认为是一个数据的话,topic就是表的概念
它可以做数据隔离,类似redis上称作的channel频道
Partition:kafka的分布式方式,一个topic可以划分成多个partition,不同的partition负责不同的数据读写请求。
Replication:kafak的容错手段,每一个partition有若干个replication,多个replication分别由不同的broker来进行负责。一个parttion的多个replicition中在某个时间点只有一个是对外提供读写服务的,这个replication被称作leader,其余的replication被称作follower,follower主动同步leader的数据。
Producer:生产者(客户端),主要负责把数据发送至kafka,producer可以随意的和任意的broker来对接。
Consumer:消费者(客户端),主要是负责取出kafka中的数据进行消费处理。consumer也可以连接任意的broker。
Consumer Group:消费组,它是被存储在kafka服务端的一个名称,每个comsumer必须属于某个消费组,不同组的消费者之间消费的数据是可以重复的,同一个消费组下的多个消费者之间,他们消费的数据是完全不会重复的,这点是kafka保证。
*kafka的集群配置:
1.broker.id=0 #每个broker都有一个唯一的标识,broker之间这个标识不能冲突
2.listeners=PLAINTEXT://:9092 #kafka服务运行时所需要占用的物理节点的端口号,同物理节点上的多个broker这个端口号不能冲突
3.log.dirs=/tmp/kafka-logs #kafka保存消息信息的文件夹目录配置,同物理节点上的多个broker这个目录不能冲突
4.zookeeper.connect=localhost:2181 #kafka服务启动需要依赖zookeeper服务,这个参数是zookeeper服务的访问url
kafka服务的启动
kafka-server-start.sh /usr/hadoop/kafka_2.11-0.10.1.0/config/server.properties &
kafka-server-start.sh /usr/hadoop/kafka_2.11-0.10.1.0/config/server1.properties &
kafka-server-start.sh /usr/hadoop/kafka_2.11-0.10.1.0/config/server2.properties &
kafka-server-start.sh /usr/hadoop/kafka_2.11-0.10.1.0/config/server3.properties &
kafka-server-start.sh /usr/hadoop/kafka_2.11-0.10.1.0/config/server4.properties &
kafka-server-start.sh /usr/hadoop/kafka_2.11-0.10.1.0/config/server5.properties &
*创建一个topic
kafka-topics.sh 提供我们对kafka的topic进行增删改查操作
kafka-topics.sh --help
--list 打印出kafka里面有多少个topic
--describe 描述一个topic的基本信息的
--create 创建topic
--alter 修改一个topic
--delete 删除一个topic
创建topic的时候需要指定两个参数:
--partitions 设置topic的partition的数量
--replication-factor 设置一个partition的replication的数量
--topic 备操作的topic的名称
--zookeeper 指定kafka集群的zookeeper的连接url地址
*创建topic
kafka-topics.sh --create --zookeeper centos1:2181 --partitions 2 --replication-factor 2 --topic bd20test1
*查看kafka有多少个topic
kafka-topics.sh --list --zookeeper centos1:2181
*查看某个topic的详细信息
kafka-topics.sh --describe --zookeeper centos1:2181 --topic bd20test1
topic默认delete不掉,需要在server.properties下面添加:
delete.topic.enable=true
配置才能保证topic直接被删掉
清除数据的规则
log.retention.hours=168
log.segment.bytes=1073741824
检查数据清除条件的时间间隔
log.retention.check.interval.ms=300000
*kafka自带客户端
生产者:kafka-console-producer.sh
消费者:kafka-console-consumer.sh
一般用于测试和调试
指令kafka-console-producer.sh的参数:
--broker-list broker的服务列表:centos1:9092,centos2:9092,centos3:9092
--topic 指定topic的名称
指令kafka-console-consumer.sh的参数:
--bootstrap-server broker的服务列表:centos1:9092,centos2:9092,centos3:9092
--zookeeper zookeeper连接入口url:centos1:2181,centos2:2181,centos3:2181
--topic 指定被消费的topic名称
--partition 指定消费topic的某个partition的数据
--offset 指定消费topic的某个partition的时候,指定其消费开始的偏移量
可以填一个数字也可以填:latest,earliest
--from-beginning 从开头开始消费
生产:
kafka-console-producer.sh --broker-list centos1:9092,centos2:9092,centos3:9092 --topic bd20test1
消费:
kafka-console-consumer.sh --bootstrap-server centos1:9092,centos2:9092,centos3:9092 --topic bd20test1 --from-beginning
kafka的保证:
1.当消息被发送到某一个特定的partition上时,kafka能保证消息是严格按照顺序存储的
2.一个消费者从一个log文件上去消费数据的时候会严格按照这个文件上的消息顺序来进行消费
3.一个topic如果有N个副本,能保证N-1副本丢失都不会影响topic的读写使用
*kafka的client要求sl4j的依赖:
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.22</version>
</dependency>
然后resources目录下拷贝一个log4j.properties文件进来
--作业
1.通过查看ProducerRecord的api文档,实现producer指定partition发送消息:
指定只把消息发送到bd20test1的partition 1里面
2.使用kafka-console-consumer.sh只消费partition 0 查看上程序执行的效果
然后使用kafka-console-consumer.sh只消费partition 1 查看上程序执行的效果
查看Consumer怎么写
*kafka在消费时产生数据丢失和数据重复的问题以及解决方案
原因:offset的值是保存在kafka里面的,而被消费的数据的消费过程是在kafka外面的
kafka对数据的消费可以保证:
at least once:至少一次,可能重复 消费成功但是offset没更新成功,把offset更新放在消费成功之后
at most once:最多一次,可以丢失 消费没成功但是offset更新层拱了,把offset的更新放在消费之前
exactly once:精确一次,不会丢失也不会失败 保证消费成功和offset更新这两个动作是一个事务(是一个原子操作)
把offset和消费的结果以事务的形式保存在kafka的外部
在启动消费的时候,从外部存储总找出offset,然后指定offset的位置开始消费
代码控制kafka的offset更新
首先,配置参数,consumer上:
enable.auto.commit 设置成 false
auto.offset.reset 设置成 none
其次,代码commit
commitAsync() --异步更新offset,不阻塞当前线程
commitSync() --同步更新offset,阻塞当前线程,直到更新成功或更新失败
把最后一次poll的数据中offset+1最大的值更新过去
commitAsync(Map<TopicPartition,OffsetAndMetadata> offsets, OffsetCommitCallback callback)
commitSync(Map<TopicPartition,OffsetAndMetadata> offsets)
异步更新offset是需要rebalance触发
rebalance:负载均衡