6、Kafka消息重复

生产者:
kafka生产者在发送数据的时候,通常会有同步与异步发送,异步就是缓存部分数据, 达到一定条数或时间后批量发送,效率高效。
那么,不管同步还是异步,消息是否发送成功,Kafka通过acks这个参数来控制的:
0—表示不进行消息接收是否成功的确认;
1—表示当Leader接收成功时确认;
-1(all)—表示Leader和Follower都接收成功时确认;
通常为了兼顾效率与数据安全,将acks设置为1,只让每个分区的leader确认收到消息即可,不等副本是否同步数据完毕。

那么,在生产者发送数据到kafka后,如果返回成功的时候,由于网络等原因出现异常, 那么生产者是收不到成功信号的,会重发,导致消息重复

那么kafka如何解决生产者发送重复消息呢?
引入幂等性:kafka producer在进行retry重试时,只会生成一个消息。
幂等性实现:PID和Sequence Number
为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。 PID:每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是 不可见的。
Sequence Numbler:(对于每个PID,该Producer发送数据的每个<Topic, Partition> 都对应一个从0开始单调递增的Sequence Number。
Broker端在缓存中保存了这seq number,对于接收的每条消息,如果其序号大于Broker 缓存中序号则接受它,否则将其丢弃。这样就可以避免消息重复提交了。
但是,只能保证单个Producer对于同一个<Topic, Partition>的Exactly Once语义。不能保证同一个Producer一个topic不同的partion幂等。
6、Kafka消息重复
实现幂等之后
6、Kafka消息重复
生成PID的流程
Producer<String, String> producer = new KafkaProducer<String, String>(props);

幂等性的应用实例
配置属性
enable.idempotence:需要设置为ture,此时就会默认把acks设置为all,所以不需要再设置acks属性了。

消费者:
消费者在拉取消息后处理,还没来得及提交offset,程序就发生异常或网络错误,此时会发生重复消费

max.poll.interval.ms
同一个consumer实例两次调用poll方法的最大间隔时间,如果超过这个时间,kafka 会认为此consumer死掉了,一般是在长时间处理业务时发生,然后kafka会rebanlance,把这个consumer对应的partition分区重新分配到consumer group中其他的消费者,此时就会发生重复消费,因为consumer还没来得及提交offset就被kafka认为挂掉了,消费者会从最新的已提交偏移量处开始消费
默认300000-5分钟,建议设置大一点

max.poll.records
poll方法依次拉取记录数,默认500,建议设置小一点

session.timeout.ms
kafka broker和consumer客户端之间发送心跳heartbeat的最大时间间隔,如果超过 这个最大时间,则认为consumer客户端掉线了,然后会把对应的partition分区分配 给consumer group其他的消费者,此时也会发生重复消费
默认10000-10秒,建议默认值

如何避免重复带来的影响?
1、消息定义一个唯一标识符uuid,消费时先判断是否存在此消息,存在则丢弃,影响性能;
2、优化消费者处理能力,对于慢消费者可以提供阻塞队列和线程池;
3、消费者指定分区消费,这样即使当一个消费者挂掉之后其他消费者也不会获取到这个消费者对应的分区,即时恢复消费者消费消息,弊端是如何保证即时恢复消费者;