kafka之九 稳定性
稳定性
- kafka的消息传输机制很直观,如果生产者向broker发送消息,commit之后.会被存到副本里面,他就不会丢失了.
- 如果在发送之后,网络出现问题,producer无法判断消息是否commit了,但是可以retry多次,直到确认已经在broker那commit.也就是至少一次,at least once
幂等性
- 对接口调用产生的结果和调用一次的是一致的,主要解决生产者在进行重试的时候可能会重复写入消息
- 限制条件
- 只能保证Producer在单个会话内不丢不管,如果Producer出现意外挂掉再重启是无法保证
- 幂等性无法跨越多个Topic-Partition,只能保证单个分区内的幂等性
- 使用:把Producer的配置enable.idempotence设置为true
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,“true”);
props.put(“acks”,“all”);//当enable.idempotence为true,这里默认为all
事务
- 幂等性只能保证单个分区,而事务可以保证多个分区写入的原子性
- 使用:需要提供唯一的transactionalId
properties.put(ProducerConfig.transational_id_config,transactionId);
- 前置条件,设置上面的transcationalId之外,还需要把ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为true,
如果设置为false,会抛出异常 - 方法:KafkaProducer提供5个和事务相关的方法
- public void initTransactions();初始化事务
- public void beginTransaction():开始事务
- public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) :为消费组提供事务内的唯一提交操作 - public void commitTransaction(),提交
- public void abortTransaction()回滚事务
控制器
- kafka集群中会有至少一个broker,会有且只有一个broker被选举为控制器,负责管理集群中所有分区和副本的状态,
- 如果分区的leader副本出现故障.控制器负责为该分区选举新的leader副本
- 分区的ISR集合发生变化时,由控制器负责同志所有broker更新数据信息
- 使用kafka-topics.sh脚本给topic增加分区数量时,还是控制器负责分区的重新分配
- kafka控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controkker这个临时节点
- 监听partition相关的变化
- 监听topic相关的变化
- 监听broker相关的变化
- 从Zookeeper中读取获取当前所有和topic,partition和broker有关信息,进行相应的管理
- zookeeper选举机制
- 每个broker启动的时候回尝试去读取/controller节点的brokerId的值,
- 如果brokerId不为-1,表示已经有其他的broker节点成功竞选为控制器了,
- 如果不存在/controller这个节点.或者这个节点的数据异常,就会尝试去创建这个节点
- 只有成功创建/controller这个节点的broker才能成为控制器
- 除了竞选成功的broker,其他的broker都会在内存中保存当前控制器的brokerId值,这个可以被标识位activeControllerid
- zookeeper里面有一个/controller_epoch节点,这个节点是持久节点,在里面存放一个整型的
controller_epoch值,用来记录控制器发生变更的次数,也就是控制器的版本- 每个和controller交互的请求都会带上controller_epoch,如果这个值不等于内存中controller_epoch值.则说明控制器已经更新了,
自己这个请求会被作废,并重新拉取controller_epoch的值保存在内存中.保证一致性
- 每个和controller交互的请求都会带上controller_epoch,如果这个值不等于内存中controller_epoch值.则说明控制器已经更新了,
- 每个broker启动的时候回尝试去读取/controller节点的brokerId的值,
可靠性保证
- 可靠性保证:确保系统在不同环境下能够发生一致的行为
- kafka的保证
- 保证分区消息的顺序
- 如果使用同一个生产者往同一个分区写入消息,而且消息b在消息a之后写入
- kafka可以保证消息b的偏移量比消息a的偏移量大,而且消费组会先读取到a再读取到b
- 只有消息被写入分区所有同步副本时,才被认为是已提交
- 生产者可以选择接受不同类型的确认,控制系数acks
- 只要有一个副本时活跃的,已提交的消息就不会丢失
- 消费组只能读取已经提交的消息
- 保证分区消息的顺序
- 副本失效
- 判断副本是否失效,由一个参数replica.lag.time.max.ms默认为10000来控制
- 当ISR的一个follower副本之后leader副本的时间超过replica.lag.time.max.ms(简称replica)的值,就是判断为失效,被剔除在ISR之外
- 副本复制:follower副本将leader的LEO(log end offset)之前的日志全部同步时,认为follower已经追上了leader副本,会更改这个副本的lastCaughtUpTimeMs标识
- kafka的副本管理器(ReplicaManager)启动时会启动副本过期检测的定时任务,而这个定时任务会定时检查当前时间和副本的lastCaughtUpTimeMs之间的差是否大于replica,
- 如果leader副本流入速度大大高于follower副本的复制速度的话,可能刚复制完,被定时器检测到,就被剔除了
- 副本不同步leader原因
- 慢副本.复制消息跟不上leader的写入,所以被判定失效,从ISR剔除
- 卡住副本,由于GC暂停或者follower失效或死亡,都会停止从leader拉取请求
一致性保证
- 前置条件:HW(high Watermark):高水位,标识一个特定的offset,消费组只能拉取到这个offset之前的消息
- leader宕机之后,从ISR选举之后,新的leader可以保证HW之前的数据,确保消费者能继续看到HW之前已经提交的数据
- HW截断机制,新的leader选举之后,为了保证数据一致性,所有follower都将HW之后的数据截断
- 宕机的leader恢复后,也会将自己的数据在新leader的HW位置之后数据全部截断,保证数据一致性
- 使用HW的话可能会导致数据丢失和数据不一致,因为HW的更新时异步延迟的,需要额外的FETCH请求处理流程才能更新
- 解决方法:leader专门保存leader的epoch信息(epoch,offset),并定期写到checkpoint文件中
- epoch就是版本号.offset就是该epoch版本写入的位移
消息重复的场景和解决方案
生产者端重复
- 问题原因:生产发送的消息没有收到正确的brok响应,导致producer重试
- 解决方案:
- 启动kafka的幂等性,修改配置文件:enable.idempotence=true,同时ack = all且retries>1
- ack=0 ,不重试:可能会丢消息,适用于吞吐量指标重要性高于数据丢失
消费者重复
- 问题原因:数据消费完没有及时提交offset到broker
- 解决方案
- 取消自动提交:每次消费完或程序退出时手动提交,但是可能也没法保证
- 下游做幂等:比如将offset记录其他地方,做唯一性校验,通过锁,事务等保证
_consumer_offsets
- _consumer_offsets是一个内部topic,保存的是kafka新版本consumer的位移信息
- 当集群中第一有消费者消费消息时会有自动创建主题_consumer_offsets,分区数可以通过offset.topic.num.partitions参数设定,默认50