RocketMQ源码分析----Consumer消费进度相关

在Consumer消费的时候总有几个疑问:

  • 消费完成后,这个消费进度存在哪里
  • 消费完成后,还没保存消费进度就挂了,会不会导致重复消费

Consumer

消费进度保存

消费完成后,会返回一个ConsumeConcurrentlyStatus.CONSUME_SUCCESS告诉MQ消费成功,以MessageListener的consumeMessage为入口分析。
消费的时候,是以ConsumeRequest类为Runnable对象,在线程池中进行处理的,即ConsumeRequest的run方法会处理这个状态

        @Override
        public void run() {

            //....
            status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            // 如果这个ProcessQueue废弃了,则不处理
            if (!processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            }
        }

在消费完成后,将status交给processConsumeResult处理,代码如下

    public void processConsumeResult(//
                                     final ConsumeConcurrentlyStatus status, //
                                     final ConsumeConcurrentlyContext context, //
                                     final ConsumeRequest consumeRequest//
    ) {
         //....消费成功或者失败的处理
        
        // 将这批消息从ProcessQueue中移除,代表消费完毕,并返回当前ProcessQueue中的消息最小的offset
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            // 更新消费进度
            this.defaultMQPushConsumerImpl.getOffsetStore()
                .updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

在分析ProcessQueue的时候,说过removeMessage返回有两种情况:

  1. 如果移除这批消息之后已经没有消息了,那么返回ProcessQueue中最大的offset+1
  2. 如果还有消息,那么返回treeMap中最小的key,即未消费的消息中最小的offset

getOffsetStore返回RemoteBrokerOffsetStore,看下其实现

    @Override
    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
        if (mq != null) {
            // 通过MessageQueue获取本地的对应的消费进度
            AtomicLong offsetOld = this.offsetTable.get(mq);
            if (null == offsetOld) {
                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
            }

            if (null != offsetOld) {
                //increaseOnly 为false则直接覆盖
                //increaseOnly为true则会判断更新的值比老的值大才会进行更新
                if (increaseOnly) {
                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
                } else {
                    offsetOld.set(offset);
                }
            }
        }
    }

这里的increaseOnly参数根据不同的情况传入不同的值,有些情况下会出现并发修改的情况,那么需要传入true,内部会进行CAS的操作,能保证正确的赋值,而一些场景下,只需要进行直接覆盖或者说没有并发修改的问题那么传入false就行了。

消费进度持久化

offsetTable是一个Map,其保存了消费进度,这只一个内存的结构,在Consumer启动的时候,会启动一个定时任务将本地的数据同步到broker,每persistConsumerOffsetInterval(默认为5)秒进行一次操作

    // mqs为需要持久化的队列集合
    public void persistAll(Set<MessageQueue> mqs) {
        if (null == mqs || mqs.isEmpty())
            return;

        final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
        if (mqs != null && !mqs.isEmpty()) {
            // 遍历本地的消费进度
            for(Map.Entry<MessageQueue, AtomicLong> entry:this.offsetTable.entrySet()){
                MessageQueue mq = entry.getKey();
                AtomicLong offset = entry.getValue();
                if (offset != null) {
                    // 如果该队列在需要持久化的队列中
                    if (mqs.contains(mq)) {
                        try {
                            // 将消费进度发送到broker
                            this.updateConsumeOffsetToBroker(mq, offset.get());
                        } catch (Exception e) {
                            log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                        }
                    } else {//废弃的消费进度
                        unusedMQ.add(mq);
                    }
                }
            }
        }
        // 如果有废弃的MQ,则将其消费进度废弃
        if (!unusedMQ.isEmpty()) {
            for (MessageQueue mq : unusedMQ) {
                this.offsetTable.remove(mq);
            }
        }
    }

传入的是当前Consumer分配的MessageQueue列表,rebalance之后,可能分配的MessageQueue已经变化,所以offsetTable里有些消费进度的队列时不需要的,所以将它的消费进度废弃
updateConsumeOffsetToBroker方法就是简单的网络请求,将offset发送给Broker

消费进度提交

除了定时提交消费进度之外,在拉取消息的时候,会顺便将本地的消费进度一起传到broker,例如查看拉取消息的方法DefaultMQPushConsumerImpl#pullMessage中的一段代码

boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        // 集群消费模式
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            // 通过offsetStore获取当前消费进度
            // ReadOffsetType.READ_FROM_MEMORY表示从本地获取(即offsetTable)
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {//
                // 传给Broker,让其判断是否需要保存消费进度
                commitOffsetEnable = true;
            }
        }
        // 构造一些标志位,这里主要看commitOffsetEnable值
        // 将commitOffsetEnable放到一个int类型的值中,让broker判断是否需要保存消费进度
                int sysFlag = PullSysFlag.buildSysFlag(//
                commitOffsetEnable, // commitOffset
                true, // suspend
                subExpression != null, // subscription
                classFilter // class filter
        );
        //....
            // 通过拉取消息请求,将commitOffsetValue和sysFlag传给broker
            this.pullAPIWrapper.pullKernelImpl(//
                    pullRequest.getMessageQueue(), // 1
                    subExpression, // 2
                    subscriptionData.getSubVersion(), // 3
                    pullRequest.getNextOffset(), // 4
                    this.defaultMQPushConsumer.getPullBatchSize(), // 5
                    sysFlag, // 6
                    commitOffsetValue, // 7
                    BrokerSuspendMaxTimeMillis, // 8
                    ConsumerTimeoutMillisWhenSuspend, // 9
                    CommunicationMode.ASYNC, // 10
                    pullCallback// 11
            );

具体broker对消费进度的处理看后面分析

Broker

消费进度保存

RocketMQ的网络请求都有一个RequestCode,更新消费进度的Code为UPDATE_CONSUMER_OFFSET,通过查到其使用的地方,找到对应的Processor为ClientManageProcessor,其processRequest处理对应的请求

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.HEART_BEAT:
                return this.heartBeat(ctx, request);
            case RequestCode.UNREGISTER_CLIENT:
                return this.unregisterClient(ctx, request);
            case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
                return this.getConsumerListByGroup(ctx, request);
            case RequestCode.UPDATE_CONSUMER_OFFSET:
                return this.updateConsumerOffset(ctx, request);
            case RequestCode.QUERY_CONSUMER_OFFSET:
                return this.queryConsumerOffset(ctx, request);
            default:
                break;
        }
        return null;
    }

更新消费进度的方法为updateConsumerOffset,里面解析了请求体之后又调用了ConsumerOffsetManager.commitOffset方法

    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
        // [email protected] 
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        this.commitOffset(clientHost, key, queueId, offset);
    }

    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
        if (null == map) {
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}",
               clientHost, key, queueId, offset, storeOffset);
            }
        }
    }

逻辑也很简单就不多说了,有意思的是,Broker的保存消费进度的结构和Consumer类似,Broker多了一个维度,因为Broker接收的是所有消费者的进度,而Consumer保存的是自己的
在Consumer的消费进度上报到Broker之后,Broker只是保存到内存,这并不可靠,大概也能猜出,和Consumer一样,也有一个定时任务将消费进度持久化。这时,先看下ConsumerOffsetManager这个类的继承关系,他的父类是ConfigManager,这个东西很重要,是几个重要配置信息持久化类,看下其继承关系:
RocketMQ源码分析----Consumer消费进度相关
分别是订阅关系管理,消费进度管理,Topic信息管理,和延迟队列信息管理,这4个配置信息都需要通过ConfigManager去持久化和加载,看下ConfigManager的几个方法

public abstract class ConfigManager {
    // 将对象转换成json串
    public abstract String encode();

    //将文件里内容(json格式)的转换成对象
    public boolean load() {
        String fileName = null;
            // 获取文件地址
            fileName = this.configFilePath();
            // 将文件里的内容读取出来
            String jsonString = MixAll.file2String(fileName);
            // json转换成指定对象的数据
            this.decode(jsonString);
    }
    // 配置文件地址
    public abstract String configFilePath();
    
    // 与load类似
    private boolean loadBak() {
        String fileName = null;
            fileName = this.configFilePath();
            String jsonString = MixAll.file2String(fileName + ".bak");
            this.decode(jsonString);
        return true;
    }
    // json转换成指定对象的数据
    public abstract void decode(final String jsonString);
    // 将对象里的数据转换成json并持久化到configFilePath()文件中
    public synchronized void persist() {
        String jsonString = this.encode(true);
            String fileName = this.configFilePath();
                MixAll.string2File(jsonString, fileName);
        
    }

    public abstract String encode(final boolean prettyFormat);

那么ConsumerOffsetManager会实现encode和decode方法并在某个地方定时调用persist方法,查看其使用的地方,找到BrokerController的initialize方法,有段定时任务如下:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerOffsetManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumerOffset error.", e);
        }
    }
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

可以看到,每flushConsumerOffsetInterval(默认5000)毫秒会进行一次持久化

拉取消息的时候保存消费进度

拉取消息的Code为RequestCode.PULL_MESSAGE,对应的Processor为PullMessageProcessor,找到其中消费进度处理的地方

// 上面说的consumer传过来的commitOffsetEnable
// 当Consumer本地消费进度大于0的时候这个参数为true
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.

// brokerAllowSuspend在处理消息请求的时候为true,hold请求自己处理是false
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
// Master才需要保存进度,slave只是同步broker的消息
storeOffsetEnable = storeOffsetEnable
        && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
    this.brokerController.getConsumerOffsetManager().commitOffset(
        RemotingHelper.parseChannelRemoteAddr(channel),
        requestHeader.getConsumerGroup(), 
        requestHeader.getTopic(), 
        requestHeader.getQueueId(), 
        requestHeader.getCommitOffset());//consumer传上来的offset
}

总的来说:
当broker为master的时候,且Consumer消费进度大于0则在拉取消息的时候顺便将消费进度保存到broker

问题分析

重复消费问题

在ProcessQueue的removeMessage的第二种情况有个问题,假设有如下情况:
批量拉取了4条消息ABCD,分别对应的offset为400|401|402|403,此时consumeBatchSize(批量消费数量,默认为1,即一条一条消费),那么会分4个线程去消费这几个消息,出现下面消费次序
消费D -> removeMessage -> 返回400(情况2)
消费C -> removeMessage -> 返回400(情况2)
消费B -> removeMessage -> 返回400(情况2)
消费A -> removeMessage -> 返回404(情况1)

在消费A之前,本地消费进度持久化到Broker之后,应用宕机了,那么此时Broker保存的是offset=400(准确来说,在消费完A且保存消费进度到broker之前,offset都是400)。那么会有什么问题呢?
先假设消费完DCB且消费进度上传完成宕机,然后重启应用,这时候会先从broker获取应该从哪里消费(),因为DCB消费完成后都是保存400这个消费进度,那么返回的是400,这时候consumer会请求offset为400的消费,到这里,已经重复消费了DCB。

消费进度保存在哪里

  1. consumer保存在内存,定时上传broker
  2. broker保存在内存,定时刷新到磁盘文件

:以上没有特别声明的都是并发消费模式

整体流程图

RocketMQ源码分析----Consumer消费进度相关