RocketMQ客户端源码分析-DefaultMQProducer
结构
DefaultMQProducer包含了DefaultMQProducerImpl,而DefaultMQProducerImpl又包含了MQClientInstance。
start
start的逻辑主要在MQClientInstance,做了这四个事情:
-
this.mQClientAPIImpl.start();
APIImpl主要负责对外的API请求,比如在需要获取broker状态时就需要调用APIImpl的方法来获取,APIImpl里包含一个NettyRemoteClient,用于和nameServer通讯。 -
this.startScheduledTask();
启动一些定时任务 -
this.pullMessageService.start();
启动拉消息服务,无论是使用push还是pull来获取消息,实际都是在pull,只是自动pull还是手动pull的区别 -
this.rebalanceService.start();
消费组里的消费者会分摊队列,就是通过rebalanceService来实现
send
send的逻辑主要在DefaultMQProducerImpl中,默认是以sync的方式发布,超时时间为3s,我们主要看sendDefaultImpl方法。
1. tryToFindTopicPublishInfo //获取Topic信息
2. 循环发送操作直至成功
2.1 selectOneMessageQueue //选择topic下的一个队列
2.2 sendKernelImpl //核心的发送操作
-
匹配TopicPublishInfo
(1)先在本地缓存的topicPublishInfoTable里匹配,且校验取出的TopicPublishInfo是否ok
(2)如果本地缓存没有或者info中的信息不全,则通过APIImpl与NameServer通讯获取TopicRouteData。
(3)根据TopicRouteData更新topicPublishInfoTable和brokerAddrTable。 -
选择一个队列
实际的选择逻辑在selectOneMessageQueue的selectOneMessageQueue中。
选择的时候是根据原子的线程变量sendWhichQueue来进行选择,初始值为随机数,每次使用时加1,然后对messageQueue的大小进行求余来选择队列。总的来说,对于每个线程,第一次发送的队列是随机,此后就是轮流发送。 -
sendKernelImpl
(1)findBrokerAddressInPublish
从本地缓存的brokerAddrTable中取出地址,如果失败则重新从nameServer获取
(2)setUniqID
创造唯一的消息ID,后面会用于消息确认、去重等。
(3)构造RequestHeader
(4)通过APIImpl将消费发送给Broker
将RequestHeader和msg传给APIImpl,由APIImpl组成RemotingCommand发给broker,等收到回复后返回。
总结
- RocketMQ客户端与NameServer通讯获取主题信息,并且缓存到本地。
- 主题信息包括主题下的队列,以及队列所在的broker信息。
- RocketMQ客户端可以指定发送到某一个队列,或者默认轮流发送。
- RocketMQ客户端发消息是直接与队列所在的broker通讯。