RocketMQ原理解析-consumer 5.push消费-顺序消费消息
http://blog.****.net/quhongwei_zhanqiu/article/details/39143755
顺序消费服务ConsumeMessageConcurrentlyService构建的时候
构建一个线程池来接收消费请求ConsumeRequest
构建一个单线程的本地线程,用来稍后定时重新消费ConsumeRequest, 用来执行定时周期性(一秒)钟锁队列任务
周期性锁队列lockMQPeriodically
获取正在消费队列列表ProcessQueueTable所有MesssageQueue, 构建根据broker归类成MessageQueue集合Map<brokername,Set<MessageQueue>>
遍历Map<brokername,Set<MessageQueue>>的brokername, 获取broker的master机器地址,将brokerName的Set<MessageQueue>发送到broker请求锁定这些队列。 在broker端锁定队列,其实就是在broker的queue中标记一下消费端,表示这个queue被某个client锁定。 Broker会返回成功锁定队列的集合, 根据成功锁定的MessageQueue,设置对应的正在处理队列ProccessQueue的locked属性为true没有锁定设置为false
通过长轮询拉取到消息后会提交到消息服务ConsumeMessageOrderlyService,
ConsumeMessageOrderlyService的submitConsumeRequest方法构建ConsumeRequest任务提交到线程池。ConsumeRequest是由ProcessQueue和Messagequeue组成。
ConsumeRequest任务的run方法
判断proccessQueue是否被droped的, 废弃直接返回,不在消费消息
每个messagequeue都会生成一个队列锁来保证在当前consumer内,同一个队列串行消费,
判断processQueue的lock属性是否为true,lock属性是否过期,如果为false或者过期,放到本地线程稍后锁定在消费。 如果lock为true且没有过期,开始消费消息
计算任务执行的时间如果大于一分钟且线程数小于队列数情况下,将processqueue, messagequeue重新构建ConsumeRequest加到线程池10ms后在消费,这样防止个别队列被饿死
获取客户端的消费批次个数,默认一批次为一条
从proccessqueue获取批次消息, processqueue.takeMessags(batchSize), 从msgTreeMap中移除消息放到临时map中msgTreeMapTemp,这个临时map用来回滚消息和commit消息来实现事物消费
调回调接口消费消息,返回状态对象ConsumeOrderlyStatus
根据消费状态,处理结果
1) 非事物方式,自动提交
消息消息状态为success:调用processQueue.commit方法
获取msgTreeMapTemp的最后一个key,表示提交的 offset
清空msgTreeMapTemp的消息,已经成功消费
2) 事物提交,由用户来控制提交回滚(精卫专用)
更新消费进度, 这里的更新只是一个内存offsetTable的更新,后面有定时任务定时更新到broker上去
相关推荐
- RocketMQ消息发送及消费的基本原理
- RocketMQ消费失败消息深入分析(consumer,broker的具体处理逻辑)
- RocketMQ消息顺序发送和消费问题
- RocketMQ总结(持久化,重发机制,分布式事务,监控机制,顺序消息,重复消费等等)
- RocketMQ原理解析-producer 3.如何发送顺序消息
- RocketMQ原理解析-consumer 2.消费端负载均衡
- RocketMQ原理解析-consumer 5.push消费-顺序消费消息
- RocketMQ原理解析-producer 3.如何发送顺序消息
- RocketMQ原理解析-consumer 5.push消费-顺序消费消息
- RocketMQ原理解析-consumer 4.长轮询push消息—并发消费消息
- 1月8日-1月15日学习内容
- RocketMQ简介