Rocketmq总结
RocketMQ总结
功能:
应用解耦
流量消峰
消息分发
RoketMQ安装
下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip
安装:unzip rocketmq-all-4.2.0-bin.zip -d ./rocketmq-all-4.2.0-bin
启动消息队列
-
启动NameServer nohup sh mqnamesrv &查看日志:tail -f ~/logs/rocketmqLogs/namesrv.log
-
启动broker nohup sh mqbroker -n localhost:9876(如果没有配置的话直接启动)查看日志:tail -f ~/logs/rocketmqLogs/broker.log
-
命令发送队列和接受队列export NAMESRV_ADDR=localhost:9876sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer启动生产端sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 启动运行消费端
-
关闭消息
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
rocketMQ角色
Producer
生产者(发信者)
Consumer
消费者(收信者)
Broker
消息队列的主体(负责暂存消息和传输消息)
NameServer
消息队列的协调(协调各个队列之间的关系)
集群配置
配置Master角色的broker和一个Slave文件互为主备
192.168.91.128 (Master Broker) 文件路径:/自己解压的所在文件夹/rocketmq-all-4.2.0-binls/conf/2m-2s-sync/broker-a.properties
brokerClusterName=DefaultClusterbrokerName=broker-abrokerId=0deleteWhen=04fileReservedTime=48brokerRole=SYNC_MASTERflushDiskType=SYNC_FLUSHlistenPort=10911namesrvAddr=192.168.91.128:9876;192.168.91.129:9876storePathRootDir=/home/rocketmq/store-a
192.168.91.129(Master Broker) 文件路径:/自己解压的所在文件夹/rocketmq-all-4.2.0-binls/conf/2m-2s-sync/broker-b.properties
brokerClusterName=DefaultClusterbrokerName=broker-bbrokerId=0deleteWhen=04fileReservedTime=48brokerRole=SYNC_MASTERflushDiskType=SYNC_FLUSHlistenPort=10911namesrvAddr=192.168.91.128:9876;192.168.91.129:9876storePathRootDir=/home/rocketmq/store-b
192.168.91.128(Slave Broker) 文件路径:/自己解压的所在文件夹/rocketmq-all-4.2.0-binls/conf/2m-2s-sync/broker-b-s.properties
namesrvAddr=192.168.91.128:9876;192.168.91.129:9876brokerClusterName=DefaultClusterbrokerName=broker-bbrokerId=1deleteWhen=04fileReservedTime=48brokerRole=SLAVEflushDiskType=ASYNC_FLUSHlistenPort=11011storePathRootDir=/home/rocketmq/store-b
192.168.91.129(Slave Broker) 文件路径:/自己解压的所在文件夹/rocketmq-all-4.2.0-binls/conf/2m-2s-sync/broker-a-s.properties
namesrvAddr=192.168.91.128:9876;192.168.91.129:9876brokerClusterName=DefaultClusterbrokerName=broker-abrokerId=1deleteWhen=04fileReservedTime=48brokerRole=SLAVEflushDiskType=ASYNC_FLUSHlistenPort=11011storePathRootDir=/home/rocketmq/store-a
参数介绍
-
namesrvAddr=192.168.91.128:9876;192.168.91.129:9876NamesrvAddr 是地址 可以是多个用封号隔开
-
brokerClusterName=DefaultClusterCluster的地址,如果集群比较多的话,可以分成多个Cluster,每个Cluster提供一个业务集群使用
-
brokerName=broker-aBroker的名称 ,Master和Slave通过使用相同的Broker名称表明相互关系的
-
brokerId=0一个Master Broker可以有多个Slave Broker ,0表示是Master,大于0表示Slave Broker
-
fileReservedTime=48在磁盘中保存消息的时长,是小时,自动删除超时消息
-
deleteWhen=04几点做消息删除动作,凌晨4点
-
brokerRole=SYNC_MASTER三种角色:SYNC_MASTER ,SLAVE,ASYNC_MASTER,SYNC和ASYNC表示的是同步消息机制。SYNC
表示master和slave 同步完成才发送消息,返回消息。
-
flushDiskType=ASYNC_FLUSHASYNC_FLUSH和SYNC_FLUSH 分别是:异步刷盘和同步刷盘,同步刷盘消息真正要写入磁盘才返回状态,异步,先保存在page_cache中后就返回状态
-
listenPort=10911端口号,各台机器端口号不能有重复
-
storePathRootDir=/home/rocketmq/store-a 配置消息存储路径
发送消息/接受消息
//设置producerGroup的名字 DefaultMQProducer producer=new DefaultMQProducer("please_rename_unique_group_name"); //producer.setInstanceName("instance1"); producer.setRetryTimesWhenSendFailed(3); //设置暂存地址 producer.setNamesrvAddr("192.168.91.128:9876;192.168.91.129:9876"); //启动 producer.start(); //先模拟的发送一条消息 for (int i=0;i<100;i++){ //构建消息体 Message message=new Message("TopicTest11","TagA",("HelloWorld RocketMQ"+i).getBytes()); //发送消息 SendResult sendResult=producer.send(message); System.out.printf("%s%n",sendResult); } //发送完成关闭 producer.shutdown();
//接受消息,根据发送者的group DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("please_rename_unique_group_name"); //去到暂存里面取消息 consumer.setNamesrvAddr("192.168.91.128:9876;192.168.91.129:9876"); // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest11","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.printf(Thread.currentThread().getName()+"Receive New Message"+list+"%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); //consumer.shutdown();
常用命令
地址:http://rocketmq.apache.org/docs/cli-admin-tool/
不同的消费类型
DefaultMQPushConsumer使用
主要设置各种参数和传入处理消息函数的。
Rocket支持两种模式
clusting:同一个ConsumerGroup里相同的Consumer只能消费所订阅消息的部分内容
同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容整体。
broadcasting:同一个ConsumerGroup里的每个Consumer都可以消费所订阅的所有消息
nameServer 地址和端口 可以多写,用逗号隔开,达到消除单点故障的的目的
Topic:topic 用来标识消费类型,需要提前创建,如果要不需要消费某个消息用Consumer.suscribe("TopicTest","tag || tag1 ||tag2");消费全部消息可以设置为null或者*
DefaultMQPushConsumer处理流程
DefaultMQPushConsumer ->DefaultMQPushConsumerImpl
switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispathToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; }
PushConsumer 有pullRequest?
Push方式的弊端:加大Server端的工作量,影响性能,client处理能力不同,client的状态不受server控制。
Pull方式:Client端循环的从Server拉取消息。主动权在client中,自己拉取之后,消费后,再去取
pull优点:长轮询即能有pull的优点,又有实时性。
DefaultMQPushConsumerImpl.java->PullAPIWrapper.java 方法: pullKernelImpl()>BROKER_SUSPEND_MAX_TIME_MILLIS=1000*15
堵塞时间15秒如果没消息就堵塞,有消息就立刻返回
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType);
PullRequestHoldService.java
-
ServiceThread.java
log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName());
每次默认时间是5秒,如果队列里面没有消息,通过一个线程去循循环查看状态,如果第三次check的时候等待时间超过request请求的时间就返回空。
注意:长轮询主动权是在Consumer
DefaultMQPushConsumer流量控制
this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
每个MessageQueue 对应一个ProcessQueue
ProcessQueue对应一个锁和Map, TreeMap是以MessageQueue的offset作为key,以消息内容作为value
读写锁控制多个线程TreeMap对象的访问
TreeMap的结构(有序集合)
-
TreeMap
-
AbstractMap(extends),NavigableMap(implements)
-
Map(implements) ,SortedMap(extends)
-
-
long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); //通过判断未处理消息个数和总大小控制是否继续请求消息 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } //对消息的处理根据消息的 if (!this.consumeOrderly) { if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return; } } else { if (processQueue.isLocked()) { if (!pullRequest.isLockedFirst()) { final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); } pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); } } else { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.info("pull message later because not locked in broker, {}", pullRequest); return; } } final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.warn("find the consumer's subscription failed, {}", pullRequest); return; }
上面消息个数,消息总大小,offset的跨度,任何一个值超过都会隔一段时间在拉取消息
public class PullConsumer { private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.start(); //获取messageQueue进行遍历 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); //遍历多个messageQueue for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { //放入pullResult结果里面 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); //维护offsetstore putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); //根据不同的消息状态做出不同的处理 switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); } }
-
获取Message Queue并遍历
-
维护offsetstore
-
根据不同的状态做不同的处理
Consumer的启动,关闭流程
-
用shutdown()释放资源保存offset
-
pushConsumer启动前做各种配置检查,然后返回链接NameServer获取Topic信息
不同类型的生产者
DefaultMQProducer步骤及状态
发送消息要经过五个步骤
-
设置Producer的GroupName
-
设置InstanceName 通过 instance进行分区
-
设置失败重试次数
-
设置NameServer地址
-
组装消息发送
消息发送的状态
-
FLUSH_DISK_TIMEOUT:表示没有在规定时间内刷盘
-
FLUSH_SLAVE_TIMEDOUT:表示主备方式下,没有在设定时间按照主从同步
-
SLAVE_NOT_AVAILABLE:这个状态是表示没有找到主从配置Slave
-
SEND_OK:发送消息成功。
发送消息延迟
setDelayTimeLevel(int level) 设置消息延迟
例如:setDelayTimeLevel(3)消息延迟10s
自定义消息发送
public class OrderMessageQueueSelector implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message message, Object orderKey) { int id=Integer.parseInt(orderKey.toString()); int idMaxIndex=id/100; int size=mqs.size(); int index=idMaxIndex; return mqs.get(index); } }
上面是使用 public MessageQueue select(List<MessageQueue> mqs, Message message, Object orderKey)函数发送消息的,实现MessageQueueSelector,根据传入的Object参数,或者是根据Message消息确定把消息发往那个Message queue,返回被选中的queue
对事物的支持
如何存储队列位置信息的
通过offset 确认Topic的一条记录在message queue的位置
-
offsetStore
-
LocalFileOffsetStore,RometeBrokerOffsetStore
-
Namesrv的功能
namesrv是整个队列中的状态服务器。各个机器要上报自己的状态给Namesrv,如果不上报证明这台机器有问题。其他组件会把他从集群机器中移除。namesrv可以多个部署。相互独立的 namesrv本身是无状态的
集群状态的存储结构
-
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;topic是key 存储了Topic的信息 value是QueueData队列。QueueData 队列的长度就是topic的Master Broker个数,QueueData存的是broker名称,读写queue的数量,同步标识。
-
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
一个master和slave,BrokerData包含cluster,BrokerName
-
private final HashMap<String/* clusterName /, Set<String/ brokerName */>> clusterAddrTable;一个clusterName对应一个brokerName
-
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;brokerLiveTable存储的内容是在这台Broker上是实时状态的,包括上次时间戳
-
private final HashMap<String/* brokerAddr /, List<String>/ Filter Server */> filterServerTable;
状态维护
@Override public void onChannelClose(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelException(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelIdle(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); }
Broker和NAMEServer长链接断掉后就会调用onChannelDestroy方法
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES);
NameServer会定期检查时间戳的逻辑,Broker向NameServer发送心跳会更新时间戳
各个角色交互角色间的交互
交互流程源码 UpdateTopicSubCommand.java
Option opt = new Option("b", "brokerAddr", true, "create topic to which broker"); opt.setRequired(false); options.addOption(opt); opt = new Option("c", "clusterName", true, "create topic to which cluster"); opt.setRequired(false); options.addOption(opt); opt = new Option("t", "topic", true, "topic name"); opt.setRequired(true); options.addOption(opt); opt = new Option("r", "readQueueNums", true, "set read queue nums"); opt.setRequired(false); options.addOption(opt); opt = new Option("w", "writeQueueNums", true, "set write queue nums"); opt.setRequired(false); options.addOption(opt); opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]"); opt.setRequired(false); options.addOption(opt); opt = new Option("o", "order", true, "set topic's order(true|false)"); opt.setRequired(false); options.addOption(opt); opt = new Option("u", "unit", true, "is unit topic (true|false)"); opt.setRequired(false); options.addOption(opt); opt = new Option("s", "hasUnitSub", true, "has unit sub (true|false)"); opt.setRequired(false)
MQClientAPIImpl.java
b和c参数比较重要 c参数表示这个Cluster下面所有的Master Broker上创建Topic的Message Queue
final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); requestHeader.setDefaultTopic(defaultTopic); requestHeader.setReadQueueNums(topicConfig.getReadQueueNums()); requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums()); requestHeader.setPerm(topicConfig.getPerm()); requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name()); requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag()); requestHeader.setOrder(topicConfig.isOrder());
创建Topic的命令发往对应的Broker上,Broker接到创建Topic的请求的后,执行相关的逻辑代码
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); this.brokerController.registerBrokerAll(false, true); return null;
AdminBrokerProcessor.java
向NameServer发送注册信息,完成创建Topic逻辑,其他客户端就会发现新增的Topic,首先要更新Broker信息,然后对每个master角色的broker,创建QueueData对象,如果新建的Topic,添加到QueueData对象中,如果是修改,就删除原先QueueData的对象。
底层通讯机制
Remoting模块
三个方法:
-
void start()
-
void shutdown()
-
void registerRPCHook(RPCHook rpcHook)
RemotingServer 和RemotingClient继承了RemotingService的接口
RemotingCommand进行封装,根据RemotingCommand进行各个模块间的交互
消息队列的核心
消息队存储结构
${storeRoot}/consumequeue/${topicName}/${queueId}/${fileName}${user.home}/store/${commitLog}/${fileName}
-
commitLog顺序写,可以大大的提高写入效率
-
利用操作系统pageCache机制,可以批量从磁盘中读取,作为cache存到内存中,加速后续的 读取速度
-
consumer只是存储偏移量的信息,Commitlog可以保证数据完整
高可用性
-
master出现故障可以在slave读取数据
-
创建Topic,把Topic的多个Message Queue创建多个Broken
组,当一个组的master不可用后,其他组的master还是可以用的
Producer仍然发送消息,Slave不可以自动转master。需要手动修改配置文件
然后再启动修改后的文件
同步刷盘和异步刷盘
-
异步刷盘:在返回成功状态时候,消息可能只是被写入了内存的PAGECACHE,写操作返回快
吞吐量大,当内存的消息量累计到一定程度时候,统一触发写入磁盘快速写入
-
同步刷盘方式:在返回写成功状态时候,消息已经被写入磁盘,消息写入pageCache后,立即通
知刷盘现成进行刷盘
同步复制和异步复制
同步复制:Master和slave同步成功,返回给客户端
异步复制:只要master成功,就可以返回给客户端
可高性消息使用场景
全局顺序消息
要保证全局顺序消息 把读写队列设置为1,然后Producer和Consumer设置为1,这样就是单线程处理。
部分顺序消息
要保证部分消息有序,发送端和消费端的配置,在发送端,要把同一个业务Id的消息发送到Message Queue
要做到MessageQueue不被并发处理。
public class OrderMessageQueueSelector implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message message, Object orderKey) { int id=Integer.parseInt(orderKey.toString()); int idMaxIndex=id/100; int size=mqs.size(); int index=idMaxIndex; return mqs.get(index); } }
这段代码就是进行有序执行的计算
消息重复解决方式
-
利用消费逻辑的幂等性(多次调用和一次调用效果相同)
-
维护自己的消息,查看消息是否被消费过
动态增减机器
动态添加NameServer
-
通过代码设置 setNamesrv
-
通过java启动项设置
-
通过linux环境配置
-
通过http方式设置
动态添加Broker
故障处理
-
Broker正常关闭,启动
-
Broker 异常Crash,然后启动
-
OS Crash重启
-
机器断电,但能马上恢复供电。
-
磁盘损坏
-
CPU,主板,内存等相关设备损坏
解决方式
-
多Master,每个master都有slave
-
主从设置为SYNC_MASTER
-
Producer用同步的方式写
-
刷盘策略设置成SYNC_FLUSH
消息优先级
-
消息大的,单独设置一个Topic来处理,防止其他共享的topic在等待
-
进行限制消息队列的个数,通过多少时候可以执行某些队列,分批执行
消息队列过滤
-
通过Tag进行过滤
通过tag对应的hashcode,经过hashcode对比,从commitLog读取出来,防止hash冲突
-
通过sql表达式进行过滤
利用msg.putUserProperty设置过滤范围字段
用MessageSelector进行筛选
-
filter Server进行过滤
通过实现MessageFilter进行过滤
提高consumer处理能力
-
提高消费并行度
-
以批量方式执行消费
-
检测延迟,跳过不重要的消息
Consumer负载均衡
启动多个Consumer,用算法分配Conumser从Broker获取全局消息然后自己做负载均衡
DefaultMQPushConsumer的负载均衡
负载均衡的结果和Topic的MessageQueue数量及ConsumerGroup数量有关系
提高Produer发送速度
-
用oneWay方式发送,只发送不等待
-
使用多个Producer同时发送,RoketMQ引入了并发窗口,讲并发数据写入DirectMem中。
NameServer源码分析
在NamesrvStartup.java中
public static main0(String[] args) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); try { //PackageConflictDetect.detectFastjson(); Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876); if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, " + file + "%n"); in.close(); } } if (commandLine.hasOption('p')) { MixAll.printObjectProperties(null, namesrvConfig); MixAll.printObjectProperties(null, nettyServerConfig); System.exit(0); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); controller.start(); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf(tip + "%n"); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
-c是指定配置文件的位置,-p是打印所有配置信息
初始化NameServer的controller
// remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); }
controller.initialize()初始化信息 , controller.start()开启Namesrv的服务
Namesrv控制逻辑
this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES);
private int serverWorkerThreads = 8;
默认是8个线程,还有连个定时任务,一个用来扫描失效的broker(scanNotActiveBroker),一个用来打印配置信息(printAllPeriodically)
启动通讯服务remotingServer 监听一个写端口,收到Broker,Client等发送过来的请求。根据请求的不同,调用不同的Processor来处理。remotingServer基于Netty封装的网络通讯服务。
业务逻辑
switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null;
通过不同的RequestCode处理不同的函数。