RocketMQ客户端源码分析-DefaultMQProducer

结构

DefaultMQProducer包含了DefaultMQProducerImpl,而DefaultMQProducerImpl又包含了MQClientInstance。
RocketMQ客户端源码分析-DefaultMQProducer

start

start的逻辑主要在MQClientInstance,做了这四个事情:

  • this.mQClientAPIImpl.start();
    APIImpl主要负责对外的API请求,比如在需要获取broker状态时就需要调用APIImpl的方法来获取,APIImpl里包含一个NettyRemoteClient,用于和nameServer通讯。
  • this.startScheduledTask();
    启动一些定时任务
  • this.pullMessageService.start();
    启动拉消息服务,无论是使用push还是pull来获取消息,实际都是在pull,只是自动pull还是手动pull的区别
  • this.rebalanceService.start();
    消费组里的消费者会分摊队列,就是通过rebalanceService来实现

send

send的逻辑主要在DefaultMQProducerImpl中,默认是以sync的方式发布,超时时间为3s,我们主要看sendDefaultImpl方法。

1. tryToFindTopicPublishInfo //获取Topic信息
2. 循环发送操作直至成功
	2.1 selectOneMessageQueue //选择topic下的一个队列
	2.2 sendKernelImpl //核心的发送操作
  • 匹配TopicPublishInfo
    (1)先在本地缓存的topicPublishInfoTable里匹配,且校验取出的TopicPublishInfo是否ok
    (2)如果本地缓存没有或者info中的信息不全,则通过APIImpl与NameServer通讯获取TopicRouteData。
    (3)根据TopicRouteData更新topicPublishInfoTable和brokerAddrTable。

  • 选择一个队列
    实际的选择逻辑在selectOneMessageQueue的selectOneMessageQueue中。
    选择的时候是根据原子的线程变量sendWhichQueue来进行选择,初始值为随机数,每次使用时加1,然后对messageQueue的大小进行求余来选择队列。总的来说,对于每个线程,第一次发送的队列是随机,此后就是轮流发送。

  • sendKernelImpl
    (1)findBrokerAddressInPublish
    从本地缓存的brokerAddrTable中取出地址,如果失败则重新从nameServer获取
    (2)setUniqID
    创造唯一的消息ID,后面会用于消息确认、去重等。
    (3)构造RequestHeader
    (4)通过APIImpl将消费发送给Broker
    将RequestHeader和msg传给APIImpl,由APIImpl组成RemotingCommand发给broker,等收到回复后返回。

总结

  • RocketMQ客户端与NameServer通讯获取主题信息,并且缓存到本地。
  • 主题信息包括主题下的队列,以及队列所在的broker信息。
  • RocketMQ客户端可以指定发送到某一个队列,或者默认轮流发送。
  • RocketMQ客户端发消息是直接与队列所在的broker通讯。