关于kafka的一些问题
1. kafka为什么要与zookeeper一起使用
2. kafka中最重要的组件是哪些
3. 什么是消费者组 为什么有什么用
4. zookeeper起什么作用
5. 没有zookeeper kafka可以运行吗
6. kafka编程是最重要的api有几个
7. kafka中leader flower 是什么意思
8. 什么叫isr
9. kafka启动流程是什么
10. kafka 接到的信息最大多大 是否可以更改
11. kafka的cluster 是什么意思
12. kafka怎么调优
13. kafka的缺点
Kafka将元数据信息保存在Zookeeper中,但是发送给Topic本身的数据是不会发到Zk上的,否则Zk就疯 了。kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。这里的客户端指的是Kafka的消息生产端(Producer)和消息消费端(Consumer)Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partition的leader建立socket连接并发送消息。也就是说每个Topic的partition是由Lead角色的Broker端使用zookeeper来注册broker信息,以及监测partition leader存活性.Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.
主题(topic) 、分区(partition)、生产者(Producers)、消费者(Consumers)、经纪人(Brokers)
consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。个人认为,理解consumer group记住下面这三个特性就好了:
1)consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程
2)group.id是一个字符串,唯一标识一个consumer group
3)consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)
leader 选举 和 follower 信息同步
Broker注册
Topic注册
生产者的负载均衡
消费者的负载均衡
记录分区与消费者的关系
不行,kafka的设计中就依赖于zookeeper
Producer的API、Consumer的API、Kafka Hadoop Consumer API
leader是领导者、flower是跟随者
8. 什么叫isr
leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护
加载配置文件 => 赋值给serverProps => 启动了一个内部的监控服务 => kafkaServerStartable.startup =>首先判断是否目前正在关闭中或者已经启动了,这两种情况直接抛出异常。然后是一个CAS的操作isStartingUp,防止线程并发操作启动,判断是否可以启动。如果可以启动,就开始我们的启动过程。
• 构造Metrics类
• 定义broker状态为启动中starting
• 启动定时器kafkaScheduler.startup()
• 构造zkUtils:利用参数中的zk信息,启动一个zk客户端
• 启动文件管理器:读取zk中的配置信息,包含consumer_offsets和__system.topic。重点是启动一些定时任务,来删除符合条件的记录(cleanupLogs),清理脏记录(flushDirtyLogs),把所有记录写到一个文本文件中,防止在启动时重启所有的记录文件(checkpointRecoveryPointOffsets)。
-》首先判断是否目前正在关闭中或者已经启动了,这两种情况直接抛出异常。然后是一个CAS的操作isStartingUp,防止线程并发操作启动,判断是否可以启动。如果可以启动,就开始我们的启动过程。
• 构造Metrics类
• 定义broker状态为启动中starting
• 启动定时器kafkaScheduler.startup()
• 构造zkUtils:利用参数中的zk信息,启动一个zk客户端
• 启动文件管理器:读取zk中的配置信息,包含consumer_offsets和__system.topic。重点是启动一些定时任务,来删除符合条件的记录(cleanupLogs),清理脏记录(flushDirtyLogs),把所有记录写到一个文本文件中,防止在启动时重启所有的记录文件(checkpointRecoveryPointOffsets)。
Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?
针对这个问题,有以下几个建议:
• 最好的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。
• 第二个方法是,将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。
• 第三,Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。
不过如果上述方法都不是你需要的,而你最终还是希望传送大的消息,那么,则可以在kafka中设置下面一些参数:
broker 配置:
• message.max.bytes (默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。
• log.segment.bytes (默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。
• replica.fetch.max.bytes (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
consumer 配置:
fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。
所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。
• 性能: 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。
• 可用的内存和分区数:Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保 分区数最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数最大需要内存空间 不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。
• 垃圾回收:到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。
一切的一切,都需要在权衡利弊之后,再决定选用哪个最合适的方案。
参考配置如下:
replica.fetch.max.bytes=4194304
message.max.bytes=4000000
compression.codec=snappy
max.partition.fetch.bytes=4194304
kafka的集群
Broker参数配置
1、网络和io操作线程配置优化
broker处理消息的最大线程数(默认为3)
num.network.threads=cpu核数+1
broker处理磁盘IO的线程数
num.io.threads=cpu核数*2
2、log数据文件刷盘策略
每当producer写入10000条消息时,刷数据到磁盘
log.flush.interval.messages=10000
每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000
3、日志保留策略配置
保留三天,也可以更短 (log.cleaner.delete.retention.ms)
log.retention.hours=72
段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件
log.segment.bytes=1073741824
4、Replica相关配置
default.replication.factor:3
这个参数指新创建一个topic时,默认的Replica数量,Replica过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在2~3为宜。
Java API调优
1、zookeeper.session.timeout.ms
解释:配置的超时时间太短,Zookeeper没有读完Consumer的数据,连接就被Consumer断开了!
参数:5000
2、zookeeper.sync.time.ms
解释:ZooKeeper集群中leader和follower之间的同步的时间
参数:2000
3、auto.commit.enable=true
解释:注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交
4、auto.commit.interval.ms
解释:自动提交offset到zookeeper的时间间隔
参数:1000
5、zookeeper.connection.timeout.ms
解释:确认zookeeper连接建立操作客户端能等待的最长时间
参数:10000
6、rebalance.backoff.ms
解释:消费均衡两次重试之间的时间间隔
参数:2000
7、rebalance.max.retries
解释:消费均衡的重试次数
参数:10
重复消息:kafka保证每条消息至少送达一次,虽然几率很小,但一条消息可能被送达多次
消息乱序:kafka某一次固定的partition内部的消息是保证是序的,如果是个Topic有多个Partition,Partition之间的消息送达不保证有序
复杂性:Kafka需要Zookeeper的支持,Topic一般人工创建,部署和维护比一般的MQ成本高