RocketMQ源码深度解析四之Producer篇

前言:本篇将从启动,Producer发送不同类型的消息的逻辑等方面,来深入了解。

(一)启动
在应用层初始化DefaultMQProducer时候,会以Producer名或者RPCHook的任一个或两个作为参数初始化DefaultMQProducer对象,然后对DefaultMQProducer对象设置NameServer地址等参数,然后调用start方法启动Producer,其实内部调用了DefaultMQProducerImpl.start 方法。大致逻辑如下:
1、检查 DefaultMQProducerImpl.ServiceState 的状态(初始化状态为
ServiceState.CREATE_JUST);只有状态为 CREATE_JUST 时才启动该Producer;
其他状态均不执行启动过程;
2、 将 DefaultMQProducerImpl.ServiceState 置为 start_failed,以免客户端同一个进程中重复启动
3、检查 producerGroup 是否合法
4、 若 producerGroup 不等于“ CLIENT_INNER_PRODUCER”则设置 Producer的实例名( instanceName);
5、 构建该 Producer 的 ClientID,等于 IP 地址@instanceName;
6,创建 MQClientInstance 对象。如果该ClientID对应的MQClientInstance存在则直接返回。说明一个 IP 客户端下面的应用,只有在启动多个进程的情况下才会创建多个 MQClientInstance 对象;
7、 将 DefaultMQProducerImpl 对象在 MQClientInstance 中注册,以producerGroup 为 key 值、 DefaultMQProducerImpl 对象为 values 值存入
8、以主题名“ TBW102”为 key 值,新初始化的 TopicPublishInfo 对象为 value值存入 DefaultMQProducerImpl.topicPublishInfoTable 变量中;
9、 入参 startFactory 等于 true, 故调用 MQClientInstance.start 方法启
动 MQClientInstance 对象;
10、设置 DefaultMQProducerImpl 的 ServiceState 为 RUNNING;
11、立即调用 MQClientInstance.sendHeartbeatToAllBrokerWithLock()方

(二)向Broker发送心跳消息
1、初始化 HeartbeatData 对象,将该 Producer 或 Consumer 的 ClientID 赋值给 HeartbeatData 对象的 clientID 变量;
2、遍历 MQClientInstance.consumerTable,根据每个 MQConsumerInner 对象的值初始化ConsumerData 对象和ProducerData对象
3、若 ConsumerData 集合和 ProducerData 集合都为空,说明没有 consumer或 produer,则不发送心跳信息;
4、若不是都为空,则遍历 MQClientInstance.brokerAddrTable 列表,向每个 Broker 地址发送请求码为 HEART_BEAT 的心跳消息,但是当存在 Consumer 时才向所有 Broker 发送心跳消息,否则若不存在 Consumer 则只向主用 Broker 地址发送心跳消息;

(三)发送普通消息
首先调用DefaultMQProducer.send方法,最终调用DefaultMQProducerImpl.sendDefaultImpl方法来完成消息的发送,大致逻辑如下:
1、 检查 DefaultMQProducerImpl 的 ServiceState 是否为 RUNNING
2、 校验 Message 消息对象的各个字段的合法性,其中 Message 对象的 body的长度不能大于 128KB;
3、以 Message 消息中的 topic 为参数获取TopicPublishInfo
4、若TopicPublishInfo不为空,设置消息重试次数和超时时间。
5,选择发送的 Broker 和 QueueId,即选择 MessageQueue 对象。
6、调用 sendKernelImpl进行消息发送
7、对于同步方式发送消息,若未发送成功,并且 Producer 设置允许选择另
一个 Broker 进行发送,则从第 4 步的检查发送失败次数和发送时间是否已经超过阀值开始重新执行;否则直接返回;
下面盗一张图:
RocketMQ源码深度解析四之Producer篇

(四)发送定时消息
目前只支持固定精度级别的定时消息,服务器按照 1-N 定义了如下级别: “1 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”; 若要发送定时消息,在应用层初始化 Message 消息对象之后,调用Message.setDelayTimeLevel(int level)方法来设置延迟级别, 按照序列取相应的延迟级别,例如 level=2,则延迟为 5s;
对于延迟级别的设置只有在该消息为非事务性消息或者为提交事务消息时才会生
效,默认情况下消息是非事务性消息。‘
同样调用 DefaultMQProducer.send(Message msg)方法进行消息发送,在将消息写入 commitlog 文件时候,会将延迟消息重新封装成主题“SCHEDULE_TOPIC_XXXX”的消息存入 commitlog 中
由 ScheduleMessageService 服务线程来检测延迟消息并到期后将真正的消息再次写入 commitlog 中等待消费;

(五)发送顺序消息
Rocketmq 能够保证消息严格顺序,但是 Rocketmq 需要 producer 保证顺序消息按顺序发送到同一个 queue 中。与发生普通消息相比,在发送顺序消息时要对同一类型的消息选择同一个队列,即同一个 MessageQueue 对象。
与普通消息的发送方法 相比,仅仅是在选择MessageQueue对象上面有区别。
再盗个图
RocketMQ源码深度解析四之Producer篇

(六)发送事务消息
这里先看看实现逻辑,然后基于转账问题进行分析
事务消息是第一阶段发送Prepared消息,拿到消息的地址,第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,然后修改状态,如果第三阶段消息发送失败,RocketMQ会定期扫描消息集群中的事务消息,发现Prepared消息,会向消息发送者确认,这时候会根据发送端设置的策略来决定回滚还是继续发送确认消息,这样就能保证消息发送和本地事务同时成功同时失败。
在应用层通过初始化 TransactionMQProducer 类,可以发送事务消息,大致
逻辑如下:
1、 自定义业务类,该类实现 TransactionCheckListener 接口的checkLocalTransactionState(final MessageExt msg)方法,该方法是对于本地业务执行完毕之后发送事务消息状态时失败导致 Broker 端的事务消息一直处于PREPARED 状态的补救, Broker 对于长期处于 PREPARED 状态的事务消息发起
回查请求时, Producer 在收到回查事务消息状态请求之后,调用该checkLocalTransactionState 方法,该方法的请求参数是之前发送的事务消息,在该方法中根据此前发送的事务消息来检查该消息对应的本地业务逻辑处理的情况,根据处理情况告知 Broker该事务消息的最终状态( commit或者 rollback);
2、 自定义执行本地业务逻辑的类,该类实现 LocalTransactionExecuter 接口的 executeLocalTransactionBranch,在该方法中执行本地业务逻辑,根据业务逻辑执行情况反馈事务消息的状态( commit 或者 rollback);
3、初始化 TransactionMQProducer 类,将第 1 步中的类赋值给TransactionMQProducer.transactionCheckListener 变量; 设置检查事务状态的线程池中线程的最大值、最小值、队列数等参数值;
4、启动Producer
5、 构建事务消息 Message 对象,然后调用TransactionMQProducer.sendMessageInTransaction方法,该方法分为三个阶段:
5.1)将事务消息 Message 对象的 properties 属性中的“ TRAN_MSG”字段设
为 true,“ PGROUP”字段设为该 Producer 的 producerGroup
5.2)若发送消息的返回结果( SendResult 对象)状态为 SEND_OK,则调用
业务层实现的 executeLocalTransactionBranch 方法,执行本地业务逻辑并返回
本地事务状态 LocalTransactionStat
5.3)若若发送消息的返回结果( SendResult 对象)状态不是 SEND_OK,则
不执行本地业务逻辑,直接将本地事务状态 LocalTransactionState 置为
ROLLBACK_MESSAGE;
5.4) 调用 DefaultMQProducerImpl.endTransaction
RocketMQ源码深度解析四之Producer篇

接下来看看那个经典的场景:Bob向Smith转账100块,且他们的信息不在同一个服务器。
我们可以在第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。如果确认消息发送失败,RocketMQ会定时扫描消息,发现Prepared会向消息发送端(生产者)确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?然后我们根据逻辑处理即可。接着Smith端开始消费这条消息,这个时候就会出现消费失败和消费超时两个问题,解决超时问题的思路就是一直重试,直到消费端消费消息成功,整个过程中有可能会出现消息重复的问题,按照前面的思路解决即可。失败的话,直接人工解决。按照事务的流程,因为某种原因Smith加款失败,那么需要回滚整个流程。如果消息系统要实现这个回滚流程的话,系统复杂度将大大提升。

(七)处理 Broker 检查事务状态的消息( CHECK_TRANSACTION_STATE)
我们之前在建立Producer时候,会指定ProducerGroup,这个主要是在事务消息的作用。主要逻辑如下:
1、根据收到的事务消息的 properties 中的“ PGROUP”参数值获取该事务消息是由哪个 producerGroup 发出的, 即获取 producerGroup 值;
2、以 producerGroup 值从 MQClientInstance.producerTable 中获取MQProducerInner 对象;
3、 根据获取请求消息的渠道解析远程 Broker 的地址;
4、调用 DefaultMQProducerImpl.checkTransactionState方法检查事务消息的状态。在该方法中初始化一个 Runnable 线程,然后将该线程放入线程池中。进行相关的处理。

(八)创建Topic
在 Broker 的配置文件中可以设置 autoCreateTopicEnable 参数为 false 或true,该参数的用处: RocketMQ 在发送消息时,会首先获取路由信息。如果是新的消息,由于 MQServer 上面还没有创建对应的 Topic,这个时候,如果上面的配置打开的话,会返回默认 TOPIC 的( RocketMQ 会在每台 broker 上面创建名为 TBW102 的 TOPIC)路由信息,然后 Producer 会选择一台 Broker 发送消息,选中的 broker 在存储消息时,发现消息的 topic 还没有创建,就会自动创建topic。后果就是:以后所有该 TOPIC 的消息,都将发送到这台 broker 上,达不到负载均衡的目的。所以建议关闭自动创建 TOPIC 的功能, 即在配置文件中将autoCreateTopicEnable 参数设置为 false,然后有两种方式定义创建 TOPIC:
1) 在 Broker 中手工配置 Topic,在启动 Broker 时会向 NameServer 注册,从而客户端就可以获取到为该 topic 提供服务的 Broker 地址信息;
2)在 Producer 发送消息之后,调用 DefaultMQProducer.createTopic