kafka之九 稳定性

稳定性

  • kafka的消息传输机制很直观,如果生产者向broker发送消息,commit之后.会被存到副本里面,他就不会丢失了.
  • 如果在发送之后,网络出现问题,producer无法判断消息是否commit了,但是可以retry多次,直到确认已经在broker那commit.也就是至少一次,at least once

幂等性

  • 对接口调用产生的结果和调用一次的是一致的,主要解决生产者在进行重试的时候可能会重复写入消息
  • 限制条件
    • 只能保证Producer在单个会话内不丢不管,如果Producer出现意外挂掉再重启是无法保证
    • 幂等性无法跨越多个Topic-Partition,只能保证单个分区内的幂等性
  • 使用:把Producer的配置enable.idempotence设置为true

    Properties props = new Properties();
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,“true”);
    props.put(“acks”,“all”);//当enable.idempotence为true,这里默认为all

事务

  • 幂等性只能保证单个分区,而事务可以保证多个分区写入的原子性
  • 使用:需要提供唯一的transactionalId

    properties.put(ProducerConfig.transational_id_config,transactionId);

  • 前置条件,设置上面的transcationalId之外,还需要把ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为true,
    如果设置为false,会抛出异常
  • 方法:KafkaProducer提供5个和事务相关的方法
    • public void initTransactions();初始化事务
    • public void beginTransaction():开始事务
    • public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
      String consumerGroupId) :为消费组提供事务内的唯一提交操作
    • public void commitTransaction(),提交
    • public void abortTransaction()回滚事务

控制器

  • kafka集群中会有至少一个broker,会有且只有一个broker被选举为控制器,负责管理集群中所有分区和副本的状态,
    • 如果分区的leader副本出现故障.控制器负责为该分区选举新的leader副本
    • 分区的ISR集合发生变化时,由控制器负责同志所有broker更新数据信息
    • 使用kafka-topics.sh脚本给topic增加分区数量时,还是控制器负责分区的重新分配
  • kafka控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controkker这个临时节点
    • 监听partition相关的变化
    • 监听topic相关的变化
    • 监听broker相关的变化
    • 从Zookeeper中读取获取当前所有和topic,partition和broker有关信息,进行相应的管理
  • zookeeper选举机制
    • 每个broker启动的时候回尝试去读取/controller节点的brokerId的值,
      • 如果brokerId不为-1,表示已经有其他的broker节点成功竞选为控制器了,
      • 如果不存在/controller这个节点.或者这个节点的数据异常,就会尝试去创建这个节点
      • 只有成功创建/controller这个节点的broker才能成为控制器
      • 除了竞选成功的broker,其他的broker都会在内存中保存当前控制器的brokerId值,这个可以被标识位activeControllerid
    • zookeeper里面有一个/controller_epoch节点,这个节点是持久节点,在里面存放一个整型的
      controller_epoch值,用来记录控制器发生变更的次数,也就是控制器的版本
      • 每个和controller交互的请求都会带上controller_epoch,如果这个值不等于内存中controller_epoch值.则说明控制器已经更新了,
        自己这个请求会被作废,并重新拉取controller_epoch的值保存在内存中.保证一致性

可靠性保证

  • 可靠性保证:确保系统在不同环境下能够发生一致的行为
  • kafka的保证
    • 保证分区消息的顺序
      • 如果使用同一个生产者往同一个分区写入消息,而且消息b在消息a之后写入
      • kafka可以保证消息b的偏移量比消息a的偏移量大,而且消费组会先读取到a再读取到b
    • 只有消息被写入分区所有同步副本时,才被认为是已提交
      • 生产者可以选择接受不同类型的确认,控制系数acks
    • 只要有一个副本时活跃的,已提交的消息就不会丢失
    • 消费组只能读取已经提交的消息
  • 副本失效
    • 判断副本是否失效,由一个参数replica.lag.time.max.ms默认为10000来控制
    • 当ISR的一个follower副本之后leader副本的时间超过replica.lag.time.max.ms(简称replica)的值,就是判断为失效,被剔除在ISR之外
    • 副本复制:follower副本将leader的LEO(log end offset)之前的日志全部同步时,认为follower已经追上了leader副本,会更改这个副本的lastCaughtUpTimeMs标识
    • kafka的副本管理器(ReplicaManager)启动时会启动副本过期检测的定时任务,而这个定时任务会定时检查当前时间和副本的lastCaughtUpTimeMs之间的差是否大于replica,
    • 如果leader副本流入速度大大高于follower副本的复制速度的话,可能刚复制完,被定时器检测到,就被剔除了
  • 副本不同步leader原因
    • 慢副本.复制消息跟不上leader的写入,所以被判定失效,从ISR剔除
    • 卡住副本,由于GC暂停或者follower失效或死亡,都会停止从leader拉取请求kafka之九 稳定性

一致性保证

  • 前置条件:HW(high Watermark):高水位,标识一个特定的offset,消费组只能拉取到这个offset之前的消息
  • leader宕机之后,从ISR选举之后,新的leader可以保证HW之前的数据,确保消费者能继续看到HW之前已经提交的数据
  • HW截断机制,新的leader选举之后,为了保证数据一致性,所有follower都将HW之后的数据截断
  • 宕机的leader恢复后,也会将自己的数据在新leader的HW位置之后数据全部截断,保证数据一致性
  • 使用HW的话可能会导致数据丢失和数据不一致,因为HW的更新时异步延迟的,需要额外的FETCH请求处理流程才能更新
    • 解决方法:leader专门保存leader的epoch信息(epoch,offset),并定期写到checkpoint文件中
    • epoch就是版本号.offset就是该epoch版本写入的位移kafka之九 稳定性
      kafka之九 稳定性

消息重复的场景和解决方案

生产者端重复

  • 问题原因:生产发送的消息没有收到正确的brok响应,导致producer重试
  • 解决方案:
    • 启动kafka的幂等性,修改配置文件:enable.idempotence=true,同时ack = all且retries>1
    • ack=0 ,不重试:可能会丢消息,适用于吞吐量指标重要性高于数据丢失

消费者重复

  • 问题原因:数据消费完没有及时提交offset到broker
  • 解决方案
    • 取消自动提交:每次消费完或程序退出时手动提交,但是可能也没法保证
    • 下游做幂等:比如将offset记录其他地方,做唯一性校验,通过锁,事务等保证

_consumer_offsets

  • _consumer_offsets是一个内部topic,保存的是kafka新版本consumer的位移信息
  • 当集群中第一有消费者消费消息时会有自动创建主题_consumer_offsets,分区数可以通过offset.topic.num.partitions参数设定,默认50