Yarn之ResourceManager详细分析笔记(一)待续
http://zengzhaozheng.blog.51cto.com/8219051/1438204/
-
ResourceTracker
-
ApplicationMasterProtocol
应用程序的ApplicationMaster同过该协议向ResourceManager注册、申请和释放资源。该协议和上面协议同样也是采用了“pull模型”,其中在RPC机制中,ApplicationMaster充当RPC client角色,ResourceManager充当RPC server角色。
-
ApplicationClientProtocol
-
客户端通过该协议向ResourceManager提交应用程序、控制应用程序(如杀死job)以及查询应用程序的运行状态等。在该RPC 协议中应用程序客户端充当RPC client角色,ResourceManager充当RPC server角色。
-
与客户端进行交互,处理来自于客户端的请求,如查询应用的运行情况等。
-
启动和管理各个应用的ApplicationMaster,并且为ApplicationMaster申请第一个Container用于启动和在它运行失败时将它重新启动。
-
管理NodeManager,接收来自NodeManager的资源和节点健康情况汇报,并向NodeManager下达管理资源命令,例如kill掉某个container。
-
资源管理和调度,接收来自ApplicationMaster的资源申请,并且为其进行分配。这个是它的最重要的职能。
-
ClientRMService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class ClientRMService extends AbstractService implements
ApplicationClientProtocol {
private static final ArrayList<ApplicationReport> EMPTY_APPS_REPORT = new ArrayList<ApplicationReport>();
private static final Log LOG = LogFactory.getLog(ClientRMService. class );
final private AtomicInteger applicationCounter = new AtomicInteger( 0 );
final private YarnScheduler scheduler; //调度器
final private RMContext rmContext; //RM上下文对象,其包含了RM大部分运行时信息,如节点列表、队列列表、应用程序列表等
private final RMAppManager rmAppManager; //app管理对象
private Server server; //一个RPC Server
protected RMDelegationTokenSecretManager rmDTSecretManager;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory( null );
InetSocketAddress clientBindAddress;
//访问控制对象,例如,一些应用程序在提交时设置了查看权限的话,其他普通用户就无法查看。
private final ApplicationACLsManager applicationsACLsManager;
private final QueueACLsManager queueACLsManager;
......
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this .server = //实现RPC协议ApplicationClientProtocol
rpc.getServer(ApplicationClientProtocol. class , this ,
clientBindAddress,
conf, this .rmDTSecretManager,
conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false )) {
refreshServiceAcls(conf, new RMPolicyProvider());
}
this .server.start();
......
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public class RMContextImpl implements RMContext {
//*异步调度器。RM中的各个服务和组件以及它们处理和输出的事件类型都是通过*异步调度器组织在一起的,这样可以有效提高系统的吞吐量。
private final Dispatcher rmDispatcher;
private final ConcurrentMap<ApplicationId, RMApp> applications //应用程序列表
= new ConcurrentHashMap<ApplicationId, RMApp>();
private final ConcurrentMap<NodeId, RMNode> nodes //节点列表
= new ConcurrentHashMap<NodeId, RMNode>();
private final ConcurrentMap<String, RMNode> inactiveNodes //非活跃节点列表
= new ConcurrentHashMap<String, RMNode>();
//正在运行中的AP心跳监控对象
private AMLivelinessMonitor amLivelinessMonitor; //正在运行中的AP心跳监控对象
//运行完毕后的AM心跳监控对象
private AMLivelinessMonitor amFinishingMonitor;
//用于存储ResourceManager运行状态
private RMStateStore stateStore = null ;
//用于Container的超时监控,应用程序必须在一定时间内(默认10Min)使用分配到的Container去运行task,否则会被回收
private ContainerAllocationExpirer containerAllocationExpirer;
//下面变量都是与安全管理相关的对象
private final DelegationTokenRenewer delegationTokenRenewer;
private final AMRMTokenSecretManager amRMTokenSecretManager;
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager;
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
......
}
|
-
AdminService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public class AdminService extends AbstractService implements ResourceManagerAdministrationProtocol {
private static final Log LOG = LogFactory.getLog(AdminService. class );
private final Configuration conf;
private final ResourceScheduler scheduler;
private final RMContext rmContext;
private final NodesListManager nodesListManager;
private final ClientRMService clientRMService;
private final ApplicationMasterService applicationMasterService;
private final ResourceTrackerService resourceTrackerService;
private Server server;
private InetSocketAddress masterServiceAddress;
private AccessControlList adminAcl;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory( null );
.....
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
@SuppressWarnings ( "unchecked" )
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException {
NodeId nodeId = request.getNodeId(); //从NodeManager带来的NodeID
String host = nodeId.getHost(); //NodeManager所在节点的host
int cmPort = nodeId.getPort(); //NodeManager所在节点的port
int httpPort = request.getHttpPort(); //对外开放的http端口
Resource capability = request.getResource(); //获得NodeManager所在节点的资源上限
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse. class );
// Check if this node is a 'valid' node
//检测节点host名称的的合法性
if (! this .nodesListManager.isValidNode(host)) {
String message =
"Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager." ;
LOG.info(message);
response.setDiagnosticsMessage(message);
response.setNodeAction(NodeAction.SHUTDOWN);
return response;
}
.....
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
//从RPC Clinet中获得nodeManager所在节点的健康状况
NodeStatus remoteNodeStatus = request.getNodeStatus();
/**
* Here is the node heartbeat sequence...
* 1. Check if it's a registered node
* 2. Check if it's a valid (i.e. not excluded) node
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
* 4. Send healthStatus to RMNode
*/
NodeId nodeId = remoteNodeStatus.getNodeId();
// 1. Check if it's a registered node
RMNode rmNode = this .rmContext.getRMNodes().get(nodeId);
if (rmNode == null ) {
/* node does not exist */
String message = "Node not found resyncing " + remoteNodeStatus.getNodeId();
LOG.info(message);
resync.setDiagnosticsMessage(message);
return resync;
}
// Send ping
this .nmLivelinessMonitor.receivedPing(nodeId);
// 2. Check if it's a valid (i.e. not excluded) node
if (! this .nodesListManager.isValidNode(rmNode.getHostName())) {
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ rmNode.getNodeAddress();
LOG.info(message);
shutDown.setDiagnosticsMessage(message);
this .rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
return shutDown;
}
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
.getResponseId()) {
LOG.info( "Received duplicate heartbeat from node "
+ rmNode.getNodeAddress());
return lastNodeHeartbeatResponse;
} else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
.getResponseId()) {
String message =
"Too far behind rm response id:"
+ lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
+ remoteNodeStatus.getResponseId();
LOG.info(message);
resync.setDiagnosticsMessage(message);
// TODO: Just sending reboot is not enough. Think more.
this .rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
return resync;
}
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
getResponseId() + 1 , NodeAction.NORMAL, null , null , null , null ,
nextHeartBeatInterval);
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);
// 4. Send status to RMNode, saving the latest response.
this .rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getContainersStatuses(),
remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
return nodeHeartBeatResponse;
}
|