RocketMQ 源码阅读 ---- 消息消费(普通消息)

RocketMQ Consumer 消费拉取的消息的方式有两种
1.      Push方式:rocketmq 已经提供了很全面的实现,consumer 通过长轮询拉取消息后回调 MessageListener 接口实现完成消费,应用系统只要重写 MessageListener 的方法完成业务逻辑即可
2.      Pull方式:完全由业务系统去控制,定时拉取消息,指定队列消费等等,当然这里需要业务系统去根据自己的业务需求去实现
 
下面介绍 push 方式(long-polling 长轮询方式实现):
RocketMQ 源码阅读 ---- 消息消费(普通消息)
1、DefaultMQPushConsumer 构造器初始化
DefaultMQPushConsumer 初始化 groupName,默认使用 AllocateMessageQueueAveragely 算法平均分配 queue 给 consumer。
 
3~6 订阅给定的 Topic
 
4、创建订阅数据对象
subExpression 为 null,则订阅 Topic 下全部内容
 
5、订阅数据对象放入缓存
ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner ,放入 「4」步骤构造好的订阅数据对象
 
6、发送心跳
如果 MQClientInstance 已经创建(步骤「12」创建),则将 consumer 心跳发送给所有 Broker
 
7、8、注册监听
给当前的 consumer 注册一个回调方法,当队列有消息的时候回调这个方法
 
9 ~ end 启动 consumer
 
11、复制一份订阅数据对象(其实就是 4、5 步骤,多了一个 retry+groupName 映射。重试队列里面 queue 看到只有1个,原来的队列queue有4个)
复制一份步骤「4」创建的订阅数据对象,放入缓存 ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner,topic 为重试 topic = %RETRY%{groupname}
 
12、创建 MQClientInstance
 
13、RebalanceImpl 属性初始化
 
14、PullAPIWrapper 初始化
 
15、offerset 存储
BROADCASTING 存在 consumer 本地文件(局部进度。多个 consumer 消费相同 queue,消费进度是跟自己本身 consumer 有关,所以存本地就行,其他 consumer 不关心也不会用到其他的 consumer 的消费进度)
CLUSTERING 存在远程 broker (全局进度。不同 consumer 消费不同 queue,是一个全局的进度,上报到 broker 在 consumer 宕机后才能由其他 consumer 继续消费)
 
16、load() 加载消费进度
 
17、消息消费服务初始化
  • 顺序消息消费 
  • 常规消息并发消费 ConsumeMessageConcurrentlyService
 
18、消息消费服务启动
  • 常规消息,则启动一个定时任务清理过期消息 cleanExpireMsg()
  • 顺序消息并且是 CLUSTERING 模式,则启动一个定时锁队列的任务 RebalanceImpl.lockAll()
 
19、注册 Consumer
将 consumer 信息存入 ConcurrentMap<String/* group */, MQConsumerInner> consumerTable
 
20 ~  和之前的图类似,这里就不认真画出时序关系了
20、MQClient 开始
这里的 start,和 producer 的一样,使用了同一个类同一个方法
 
21、Netty 服务启动
 
22、启动定时任务
(1)如果没有设置 nameserver 地址,则定时获取 nameserver 地址
(2)定时从 NameServer 更新 topic 路由信息(包含消费者 Set<MessageQueue> subscribeInfo 、生产者 TopicPublishInfo)
(3)定时清理下线的 Broker;给所有 Broker 发送心跳(包含订阅关系)
(4)定时持久化所有消费者 Offset(即消费进度)。 存储方式参考步骤「15」
(5)定时调整线程池
 
23、拉消息线程启动
push 方式,却使用  this.pullMessageService.start(); 拉消息服务,因为这是一个推拉结合的实现。
 
24~28 负载均衡服务
遍历步骤「5、11」赋值的 subscriptionInner
(1)Push 模式的均衡:DefaultMQPushConsumerImpl  
  •         无序 
            <1> 集群模式
                            // topicSubscribeInfoTable 是 client 从 nameserver 上拉的路由信息
                        a. ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();  
                            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic)
                            
topicSubscribeInfoTable
        "%RETRY%please_rename_unique_group_name_4" -> 
                MessageQueue [topic=%RETRY%please_rename_unique_group_name_4, brokerName=LAPTOP-P9TNK0JN, queueId=0]
        
        "TopicTest" ->
                MessageQueue [topic=TopicTest, brokerName=LAPTOP-P9TNK0JN, queueId=2]
                MessageQueue [topic=TopicTest, brokerName=LAPTOP-P9TNK0JN, queueId=3]
                MessageQueue [topic=TopicTest, brokerName=LAPTOP-P9TNK0JN, queueId=0]
                MessageQueue [topic=TopicTest, brokerName=LAPTOP-P9TNK0JN, queueId=1]
 
                           
                           
 
29、查询 ConsumerId 列表
// cidAll = [email protected]
 List<String> cidAll = findConsumerIdList(final String topic, final String group) 。先根据 group 去 broker address (优先使用 master,没有master 随机一个 slaver 地址    )查找所有 consumer list(还记得之前说过的 client 会定时向 broker 注册自己的信息么 「为什么 client 要向 broker 发心跳(this.mQClientFactory.sendHeartbeatToAllBrokerWithLock())?  发 group 信息给 broker」)。
 
 
30、使用负载均衡算法分配队列
  allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) 。然后根据步骤「1」中的分配算法 AllocateMessageQueueAveragely 给 consumr group 分配 queue。
                             AllocateMessageQueueAveragely  平均分配如下图:比如默认一个 topic 有 4 个 queue,2个人消费,就是一人分两个,平均分。根据 clientId,分配到的 queueid 固定。
                            RocketMQ 源码阅读 ---- 消息消费(普通消息)                            
                          
                              AllocateMessageQueueAveragelyByCircle 循环平均分配:    
                               RocketMQ 源码阅读 ---- 消息消费(普通消息)
31、步骤「30」负载均衡完成之后,更新本地处理队列缓存
ProcessQueue (主要是 TreeMap<Offset, Msg> 和读写锁)作用:保存了所有获取到,但是还未被处理的消息
有了 ProcessQueue 的帮助 PushConsumer 会判断获取但还未处理的消息个数、 消息总大小、 Offset 的跨度,任何一个值超过定的大小就隔一段时间再拉取消息, 从而达到流量控制的目的。 此外 ProcessQueue 还可以辅助 实现顺序消费的逻辑。
 
updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder)
                                    a. 如果队列已经删除或者队列两分钟没有拉取消息,则 removeUnnecessaryMessageQueue(mq, pq) 删除无用消息队列,并且 processQueue 置为 drop = true 状态
                                    b. 
                                        long nextOffset = this.computePullFromWhere(mq) 。计算从哪里开始拉取,根据 topic、group、queueId 查询消费进度 offset
                                        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 增加正在被消费的队列
                                        创建数据拉取请求 PullRequest                               
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset); // 消费进度
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
                                     this.dispatchPullRequest(pullRequestList)  :是在 PullMessageService 类中,将上一步对象赋值进 LinkedBlockingQueue<PullRequest> pullRequestQueue 。这是一个阻塞队列,当有有数据赋值进来,则下面方法就可以开始执行 pullMessage(pullRequest) 的方法
 
向 broker 发送请求
// PullMessageService.java
@Override
public void run() {
log.info(this.getServiceName() + " service started");
 
while (!this.isStopped()) {
try {// Push Consumer 的示例,为什么会有 PullRequest ,这是通过 long polling 长轮询方式达到 Push 效果。既有 pull 的优点不会压垮 client,又有 Push 的实时性
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
} catch (InterruptedException e) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
 
log.info(this.getServiceName() + " service end");
}
RocketMQ 源码阅读 ---- 消息消费(普通消息)
 
2、获取消费者 ()
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup())
ConcurrentMap<String/* group */, MQConsumerInner> consumerTable
 
3、用真正实现类去拉取消息
pullMessage(pullRequest)
        判断 processQueue.isDropped() 是否已经不可用,不可用用直接 return
        设置最后一次拉取时间戳 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis())
        确保服务状态正常 this.makeSureStateOK()
        限流,拉取还未消费超过默认值1000,则一定延时之后再拉。 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue())
        限流,拉取未消费数据量大于100M,则一定延时之后再拉。if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue())
        
        消费模式:
                        正常消费(限流,拉取消息跨度太大,一定时间后再拉。 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()))
                        顺序消费 (消费队列必须已经锁定,否则果断时间再尝试拉取)      
                                                
 
4、获取订阅数据
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
 
5、创建回调对象
PullCallback pullCallback = new PullCallback()。从 broker 异步拉取成功后回调方法
 
6、从内存中读取 commitOffsetValue
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY) // 这里获取到的是 803 与 pullRequest.getNextOffset() 获取内容一致
 
7、生成 sysFlag
这里我获取到的是 3
 
8、拉取消息
pullKernelImpl(mq,subExpression,expressionType,subVersion,offset,maxNums,sysFlag,commitOffset,brokerSuspendMaxTimeMillis,timeoutMillis,CommunicationMode communicationMode,PullCallback pullCallback)
 
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);  // 长轮询使用参数。broker 最长阻塞时间,broker 没有消息时才阻塞,有消息立刻返回。 
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);
 
        
9、获取 broker 地址 
findBrokerAddressInSubscribe(brokerName,brokerId,onlyThisBroker)  先从本地取,没有再去nameserve取。
 
10、11、使用 Netty 发起异步请求
pullMessage(addr,PullMessageRequestHeader,timeoutMillis,CommunicationMode,pullCallback)
将请求响应对应信息维护在 ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable,这样异步请求成功的时候就能在回调的时候根据 opaque,就能找到请求对应的响应
 
 
broker 接受请求
RocketMQ 源码阅读 ---- 消息消费(普通消息)
1、处理consumer发来的请求
 
2、创建响应对象
 
3、响应对象赋值 opaque
 
4、获取订阅组配置
SubscriptionGroupConfig [groupName=please_rename_unique_group_name_4, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
 
5、获取 topic 配置
ConcurrentMap<String, TopicConfig> topicConfigTable
this.topicConfigTable.get(topic)
TopicConfig [topicName=TopicTest, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
 
6、获取消费组的信息
ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable
this.consumerTable.get(group)
 
7 ~ 15、获取消息 getMessage(group,topic,queueId,offset,maxMsgNums,messageFilter)
    8、9 获取最大偏移量
    10 获取消费队列
            GetMessageResult [status=OFFSET_OVERFLOW_ONE, nextBeginOffset=805, minOffset=0, maxOffset=805, bufferTotalSize=0, suggestPullingFromSlave=false]
    11、获取队列中最小偏移量,表示队列中数据当前最小位置
    12、获取队列中最大偏移量,表示队列中数据最大的位置。 当传进来的下一个要读取 offset 和 maxoffset 相同则说明没有新的数据可读
    13、getIndexBuffer(offset) 获取
    14、commitLog.getMessage(offsetPy, sizePy)
    15、selectMappedBuffer(int pos, int size) 从 mappedByteBuffer 读取信息
     
    
上面是有消息的情况,如果没有消息,response 不立刻返回,而是靠 PullRequestHoldService.java 的 run 方法轮询处理
//PullMessageProcessor.java 
....
case ResponseCode.PULL_NOT_FOUND:
 
                    if (brokerAllowSuspend && hasSuspendFlag) {
                        long pollingTimeMills = suspendTimeoutMillisLong;
                        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                        }
 
                        String topic = requestHeader.getTopic();
                        long offset = requestHeader.getQueueOffset();
                        int queueId = requestHeader.getQueueId();
                        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);  // broker hold 住请求
                        response = null;  // 没消息,没有马上返回 response
                        break;
                    }
....
 
 
PullRequestHoldService.java
 
@Override
    public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { // 允许长轮询就多等会.. 
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); // 不允许长轮询,就只延迟默认 1000ms
                }
 
                long beginLockTimestamp = this.systemClock.now();
                // 遍历 hold 住的 request list
                // 1、查看 topic 下的 queue 有没有新消息进来,有的话就返回 response。 
                // 2、没有新消息,而且hold时间超过了,之前传进来的 broker 最大挂起时间 timeoutMillis,也立即返回 response。
                // 3、剩余的,继续等待最后进入 1或者2
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
 
        log.info("{} service end", this.getServiceName());
    }
长轮询方式的局限性,是在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。
       
 
32、消息队列变更
messageQueueChanged(topic, mqSet, allocateResultSet)  将心跳信息发给所有 broker
 
 
从 broker 拉取消息后返回 consumer。对应代码就是消息怎么回调到 PullCallback
RocketMQ 源码阅读 ---- 消息消费(普通消息)
1、Netty client 的回调方法 
 
2 ~ ? 处理接收到的消息
4、从缓存中获取 opaque 对应的响应信息
ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable
ResponseFuture responseFuture = responseTable.get(opaque);
在发送请求的时候已经赋值好了 responseTable,所以在拿到 response 的时候,根据传回来的 opaque 就能对应上原来的 request。
 
5、删除缓存 opaque 信息
拿到响应对象后,就可以先从缓存删除 opaque 相应的响应信息
 
6、7、8 执行 netty 的回调,响应服务端的请求
因为在 consumer pull message 的时候,netty 是异步调用,所以响应的时候去回调步骤「8」
RocketMQ 源码阅读 ---- 消息消费(普通消息)
 
9、回调 PullCallback 的回调方法
 
10、处理返回的结果
 
11、反序列化
 
12、设置下一个偏移量
 
13、将消息放入 ProcessQueue
将消息放进 porcessQueue 的 TreeMap 数据结构里面,这个过程加锁执行
TreeMap<Long, MessageExt> msgTreeMap
 
14、15、16、17 将消息提交给消费执行线程池 consumeExecutor 消费
非顺序消息  ConsumeMessageConcurrentlyService 的 run 方法就会去回调 consumer 最初设置的 listener 回调方法,这样消息就到了 consumer 测试用例重写的方法了。
顺序消息  ConsumeMessageConcurrentlyService 会在 listener 回调前进行一些操作(例如mq锁检查),已经调用后失败的处理与非顺序消息不同。(顺序消息不能像无序消息一样,消费失败再次丢进 broker,这样就乱序了,只能延迟一会再消费。)
 
18、处理消费后的结果
广播模式消费失败,报错
集群模式消费失败,丢回 broker(sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) 这个消息不是马上就能被消费的,是有一定延迟的延迟消息),丢broker 如果失败,则自己延迟消费(submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()))
 
19、清除这批消息
 
20、持久化 offset (这属于消费完毕更新,当然还有之前启动时候的定时任务持久化 offset)
集群模式更新远程 offset(RemoteBrokerOffsetStore)
广播模式更新本地 offset
 
 
 
 
 
            <2> 广播模式
 
  •         有序
 
(2)Pull 模式的均衡:DefaultMQPullConsumerImpl