RocketMQ原理学习-- Name Server
Name Server作为RocketMQ的一个组件,其作用就是一个注册中心,用于管理Broker相关的一些信息,生产者和消费者可以从Name Server中获取Broker中相关的Topic信息等,Name Server可以单台部署也可以多台部署,相互之间不存在联系。
Name Server主要有以下两个功能:
- 维护一份Broker信息(集群名称、Broker名称及相关地址信息),Broker在启动后会将自身信息注册到Name Server中
- 维护每个Topic相关的信息,Broker心跳时会将自身的topic提交到Name Server,生产者在发送消息时会根据Topic名称获取Broker的列表,消费者在监听Topic时也会根据Topic名称从Name Server中获取相关Broker的信息。
Name Server主要在RouteInfoManager中维护了以下几个列表:
//每个Topic的列表信息 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //每个Broker的地址列表 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //集群名称对应的Broker名称 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //每个broker地址信息 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; //每个broker地址相关的过滤器信息 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Name Server 启动的时序图
接下来我们通过分析源码简单了解一下:
1、请求处理器DefaultRequestProcessor:
在DefaultRequestProcessor中提供processRequest方法,会根据请求中的RequestCode值调用不同的处理操作。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
//注册Broker信息
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
//删除Broker信息
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
//获取Topic信息
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
//获取Broker的机器信息
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
//获取所有Topic信息
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
//删除Topic
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
//通过命名空间获取配置信息
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
//获取Topic信息
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
//获取系统Topic
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
//更新NameServer的配置
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
//获取Name Server的配置信息
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
2、Broker注册:
(1)Broker注册请求头信息
(2)Broker注册请求体信息
根据请求头及请求体中的信息将相关Broker和Topic信息添加到RouteInfoManager中。
3、Producer查询Topic信息:
(1)Producer请求头信息
根据Topic名称TopicA-Test从 RouteInfoManager的topicQueueTable中获取相关队列信息。
调用代码如下:
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
相关博客: