四、RocketMQ消息消费总览
- 1.消费者启动
-
- 2.消息消费总览
-
- 1.线程ReblanceService:
- 1.作用:
- a.Rebalanceservice线程每隔20s对消费者订阅的主题进行一次队列重新分配,每一次分配都会获取主题的所有队列、从 Broker 服务器实时查询当前该主题该消费组内消费者列表,对新分配的消息队列会创建对应的 PullRequest 对象。在一个 JVM 进程中,同一个消费组同一个队列只会存在一个 PullRequest 对象 。
- b.负责多个消费者负载该主题下的多个消费队列,并且当有新的消费者加入或原消费者下线时,负责消息队列的重新分布。由于每次进行队列重新负载时会从 Broker 实时查询出当前消费组内所有消费者,并且对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消费队列从而消费消息。
- 2.触发RebalanceServcie.doRebalance操作两个场景:1.该线程的自旋20s;2.当消费者下线(发送心跳超时或直接挂掉)或新增消费者时,broker向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费;
- 3.RocketMQ消息队列分配算法:
- 1.作用:
- 2.线程PullMessageService:
- 1.作用:获取到PullRequest对象后,从broker默认每次拉取32条消息,按消息的队列偏移量顺序放在ProcessQueue中,线程PullMessageService然后将消息提交到消费者消费线程池,无论消费成功or失败,都将拉取到的消息从ProcessQueue中移除。
- 2.当从broker拉取过消息过来放入到PullRequest的treemap中,如果消费不及时,treemap会不会被oom?其实不会,因为有限流:被限流后,方法返回,不会再从broker取消息数据。
- 3.保证线程的轻量级,io线程与业务线程分离。
- a.broker侧收到consuer通过长连接的请求,通过netty的io线程方法(channelRead),交给业务线程池处理(默认是NettyClientPublicExecutor);业务线程处理完后,通过netty的io线程方法(writeAndFlush)发送给客户端
- b.consumer亦是如此
- 4.上报消费进度的方式
- 1.每个5s,周期性上报(defaultMQPushConsumerImpl.getOffsetStore().updateOffse);
- 2.在拉取消息时,通过入参也会上报进度
- 3.消息服务端broker组装消息
-
- 1.根据consumeQueue从comitlog的堆外内存获取消息时,是逐个循环索引,逐个取comitlog中消息。为什么不是批量从commitlog中获取?
- 因为consumeQueue中的相邻的索引对应的消息在commitlog中不相邻。所以commitlog文件的消息读取是随机读。虽然是随机读,仍然高效是因为有pageCahe存在,当读取范围超越了pageCache范围后,才会到磁盘。
- 2.读取消息时,是否都会命中pagecache?
- 不会的。当读取消息的物理偏移量与最新消息的物理偏移量,超过内存的40%,将会建议从片读消息。原因是:所有的commitlog文件都是从pagecahce中读取,但pagecache会有操作系统级别的置换算法的,没有命中的话,会产生缺页中断,会去磁盘上加载到pagecache中,这个由操作系统实现。
- 3.当成功获取消息后,将消息作为响应写回给客户端的两种方式?
- 由参数this.brokerController.getBrokerConfig().isTransferMsgByHeap()决定,Y:需要拷贝到用户态下的heap,然后再copy到内核态的socket,需要开销。N:zero copy-FileRegion。
- 1.根据consumeQueue从comitlog的堆外内存获取消息时,是逐个循环索引,逐个取comitlog中消息。为什么不是批量从commitlog中获取?
- 4. 消息进度管理
- 1.进度存储:
- b.集群模式:同一个消费组内的所有消息消费者共享消息主题下的所有消息, 同一条消息(同一个消息消费队列)在同一时间只会被消费组内的一个消费者消费,并且随着消费队列的动态变化重新负载,所以消费进度存储文件存放在消息服务端Broker。
- a.广播模式:同一个消费组的所有消息消费者都需要消费主题下的所有消息,也就是同组内的消费者的消息消费行为是独立的,互相不影响,故消息进度需要独立存储,最理想的存储地方应该是与消费者绑定
- 2.消费进度思考
- a.消费者线程池每处理完一个消息消费任务( ConsumeRequest)时会从 ProceeQueue中移除本批消费 的消息 ,并返回 ProcessQueue 中最小的偏移量,用该偏移量更新消息队列消费进度,也就是说更新消费进度与消费任务中的消息没什么关系,会带来一个潜在的重复问题?可能出现ProcessQueue的msgTreeMap的最小的偏移量一直消费失败,但后面的消息偏移量都已消费成功,并且消费失败的消息写回broker也失败,造成该最小偏移量的消息ACK卡进度,消费进度无法向前推进?
- rocketmq的解决方案
- 会定期扫描主题下的所有小覅额,达到这个timeout的那些消息,就会触发sendBack操作以达到ack的目的,推进消费进度。
- 1.进度存储:
- 1.线程ReblanceService: