RocketMQ源码分析(四)之NameServer

版本声明

  1. 基于rocketmq-all-4.3.1版本
  2. 如有发现分析不正确的地方欢迎指正,谢谢!

NameServer介绍

  1. NameServer本身的高可用是通过部署多台NameServer服务。NameServer互相独立,彼此之间不会通信(即多台NameServer的数据并不是强一致的),任意一台宕机并不会影响其他的NameServer
  2. 作用
    • 维护活跃的Broker地址列表,包括Master和Slave
    • 维护所有TopicTopic对应队列的地址列表
    • 维护所有BrokerFilter列表
  3. BrokerNameServer关系
    • 单个Broker与所有NameServer保持长连接
    • Broker每隔30秒向所有NameServer发送心跳,心跳包含了自身的topic信息
    • NameServer每隔10秒钟扫描所有存活的Broker连接,若连接2min内没有发送心跳信息,则断开连接
    • Broker在启动后向所有NameServer注册,Producer在发送消息之前先从NameServer获取Broker服务器的地址列表,然后根据负载均衡算法从列表中选择一台Broker进行消息发送

KVConfigManager

  1. 内存级的KV存储,提供增删改查以及持久化数据的能力。本质就是一个HashMap

       
        public class KVConfigManager {
         
            private final NamesrvController namesrvController;
        
            private final ReadWriteLock lock = new ReentrantReadWriteLock();
            // 
            private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
                new HashMap<String, HashMap<String, String>>();
        
            public KVConfigManager(NamesrvController namesrvController) {
                this.namesrvController = namesrvController;
            }
        
            public void load() {
                String content = null;
                try {
                    content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
                } catch (IOException e) {
                    log.warn("Load KV config table exception", e);
                }
                if (content != null) {
                    KVConfigSerializeWrapper kvConfigSerializeWrapper =
                        KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
                    if (null != kvConfigSerializeWrapper) {
                        this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
                        log.info("load KV config table OK");
                    }
                }
            }  
        }  
    

NameServer源码分析

RouteInfoManager

  1. RouteInfoManager保存所有的TopicBroker信息

        public class RouteInfoManager {
            private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
            private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
            private final ReadWriteLock lock = new ReentrantReadWriteLock();
            //topic列表对应的队列信息
            private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
            //Broker地址信息
            private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
            //broker集群信息
            private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
            //Broker当前存活的Broker(非实时)
            private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
            //Broker过滤信息
            private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
        
            public RouteInfoManager() {
                this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
                this.brokerAddrTable = new HashMap<String, BrokerData>(128);
                this.clusterAddrTable = new HashMap<String, Set<String>>(32);
                this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
                this.filterServerTable = new HashMap<String, List<String>>(256);
            }
       ...省略... 
       } 
    
    
    

    RocketMQ源码分析(四)之NameServer

  2. 通过远程调试查看具体内容(双主双从,两个nameserver)

    • ip地址列表

      • rocketmq-slave2 172.19.0.7
      • rocketmq-slave1 172.19.0.6
      • rocketmq-master2 172.19.0.5
      • rocketmq-master1 172.19.0.4
      • rocketmq-nameserver2 172.19.0.3
      • rocketmq-nameserver1 172.19.0.2
    • topicQueueTable内容如下
      RocketMQ源码分析(四)之NameServer

    • BrokerAddrTable内容如下
      RocketMQ源码分析(四)之NameServer

    • clusterAddrTable内容如下
      RocketMQ源码分析(四)之NameServer

    • brokerLiveTable内容如下
      RocketMQ源码分析(四)之NameServer

BrokerHouseKeepingService

  1. BrokerHouseKeepingService:实现了ChannelEventListener接口,用于处理Broker状态事件,当Broker失效、异常或者关闭,则将BrokerRouteInfoManager中移除。

        public class BrokerHousekeepingService implements ChannelEventListener {
            private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
            private final NamesrvController namesrvController;
        
            public BrokerHousekeepingService(NamesrvController namesrvController) {
                this.namesrvController = namesrvController;
            }
        
            @Override
            public void onChannelConnect(String remoteAddr, Channel channel) {
            }
        
            @Override
            public void onChannelClose(String remoteAddr, Channel channel) {
                //通道关闭从RouteInfoManager中移除Broker
                this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
            }
        
            @Override
            public void onChannelException(String remoteAddr, Channel channel) {
                //通道发生异常从RouteInfoManager中移除Broker
                this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
            }
        
            @Override
            public void onChannelIdle(String remoteAddr, Channel channel) {
                //通道失效从RouteInfoManager中移除Broker
                this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
            }
        }
        
    
    

DefaultRequestProcessor

  1. DefaultRequestProcessor默认请求处理器,根据RequestCode执行相应的处理,核心处理方法processRequest()

        @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
    
            if (ctx != null) {
                log.debug("receive request, {} {} {}",
                    request.getCode(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    request);
            }
    
    
            switch (request.getCode()) {
                case RequestCode.PUT_KV_CONFIG:
                    return this.putKVConfig(ctx, request);
                case RequestCode.GET_KV_CONFIG:
                    return this.getKVConfig(ctx, request);
                case RequestCode.DELETE_KV_CONFIG:
                    return this.deleteKVConfig(ctx, request);
                case RequestCode.QUERY_DATA_VERSION:
                    return queryBrokerTopicConfig(ctx, request);
                case RequestCode.REGISTER_BROKER:
                    Version brokerVersion = MQVersion.value2Version(request.getVersion());
                    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                        return this.registerBrokerWithFilterServer(ctx, request);
                    } else {
                        return this.registerBroker(ctx, request);
                    }
                case RequestCode.UNREGISTER_BROKER:
                    return this.unregisterBroker(ctx, request);
                case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                    return this.getRouteInfoByTopic(ctx, request);
                case RequestCode.GET_BROKER_CLUSTER_INFO:
                    return this.getBrokerClusterInfo(ctx, request);
                case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                    return this.wipeWritePermOfBroker(ctx, request);
                case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                    return getAllTopicListFromNameserver(ctx, request);
                case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                    return deleteTopicInNamesrv(ctx, request);
                case RequestCode.GET_KVLIST_BY_NAMESPACE:
                    return this.getKVListByNamespace(ctx, request);
                case RequestCode.GET_TOPICS_BY_CLUSTER:
                    return this.getTopicsByCluster(ctx, request);
                case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                    return this.getSystemTopicListFromNs(ctx, request);
                case RequestCode.GET_UNIT_TOPIC_LIST:
                    return this.getUnitTopicList(ctx, request);
                case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                    return this.getHasUnitSubTopicList(ctx, request);
                case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                    return this.getHasUnitSubUnUnitTopicList(ctx, request);
                case RequestCode.UPDATE_NAMESRV_CONFIG:
                    return this.updateConfig(ctx, request);
                case RequestCode.GET_NAMESRV_CONFIG:
                    return this.getConfig(ctx, request);
                default:
                    break;
            }
            return null;
        }
    
    
    
    

NamesrvStartup

  1. NamesrvStartupNameServer的启动入口,启动的核心是调用NamesrvControllerinitialize()方法

         public boolean initialize() {
        
                //从文件中加载数据到内存中,默认从${user.home}/namesrv/kvConfig.json文件加载
                this.kvConfigManager.load();
                //创建服务Server,传入处理连接的ChannelEventListener
                this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        
                //默认任务处理器的线程池,每一个RequestCode可以单独设置一个线程池,如果不设置就使用默认的线程池
                this.remotingExecutor =
                    Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        
                //注册默认处理器,根据requestCode执行相应的处理
                this.registerProcessor();
        
                //启动后延迟5秒开始执行,每隔10秒执行一次,对于两分钟没有活跃的broker,关闭连接
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        
                    @Override
                    public void run() {
                        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                    }
                }, 5, 10, TimeUnit.SECONDS);
        
                //启动后延迟1min,每隔10分钟执行打印configTable
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        
                    @Override
                    public void run() {
                        NamesrvController.this.kvConfigManager.printAllPeriodically();
                    }
                }, 1, 10, TimeUnit.MINUTES);
        
                if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                    // Register a listener to reload SslContext
                    try {
                        fileWatchService = new FileWatchService(
                            new String[] {
                                TlsSystemConfig.tlsServerCertPath,
                                TlsSystemConfig.tlsServerKeyPath,
                                TlsSystemConfig.tlsServerTrustCertPath
                            },
                            new FileWatchService.Listener() {
                                boolean certChanged, keyChanged = false;
                                @Override
                                public void onChanged(String path) {
                                    if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                        log.info("The trust certificate changed, reload the ssl context");
                                        reloadServerSslContext();
                                    }
                                    if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                        certChanged = true;
                                    }
                                    if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                        keyChanged = true;
                                    }
                                    if (certChanged && keyChanged) {
                                        log.info("The certificate and private key changed, reload the ssl context");
                                        certChanged = keyChanged = false;
                                        reloadServerSslContext();
                                    }
                                }
                                private void reloadServerSslContext() {
                                    ((NettyRemotingServer) remotingServer).loadSslContext();
                                }
                            });
                    } catch (Exception e) {
                        log.warn("FileWatchService created error, can't load the certificate dynamically");
                    }
                }
        
                return true;
            }
        
    
    

    RocketMQ源码分析(四)之NameServer