HDFS的DataNode源码分析
1.大致流程
DataNode.main() // 入口函数
|——secureMain(args, null);
|——createDataNode(args, null, resources); // 创建DataNode
|——instantiateDataNode(args, conf, resources);
|——getStorageLocations(conf); // 根据配置拿到HDFS的Block实际存储的本地路径,即hdfs-site.xml文件中的dfs.datanode.data.dir属性
|——UserGroupInformation.setConfiguration(conf); // 设置配置
|——makeInstance(dataLocations, conf, resources); // 实例化DataNode
|——dn.runDatanodeDaemon(); // 运行DataNode的后台守护线程
|——datanode.join(); // 将此DataNode放入一个线程等待池
2.详解makeInstance(dataLocations, conf, resources)方法
1. List<StorageLocation> locations = checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
权限检查,拿到可用存储路径
2.new DataNode(conf, locations, resources);
先给成员变量进行一系列赋值
getHostName(conf); // 获取DataNode所在机器的主机名
startDataNode(conf, dataDirs, resources); // 启动DataNode
|——new DataStorage(); // 管理磁盘目录、
|——registerMXBean(); // 注册一个DataNode的Bean
|——initDataXceiver(conf); // 重点方法
|——startInfoServer(conf); // 启动DataNode的http服务,不对外提供服务
|——pauseMonitor.start(); // 构造一个JvmPauseMonitor,然后初始化,再启动,检查JVM是否停顿
|——initIpcServer(conf); // 初始化RPC服务
|——blockPoolManager.refreshNamenodes(conf); // 主要是向NameNode注册,汇报心跳、磁盘块、内存块等信息
3.重点方法initDataXceiver(conf)
new TcpPeerServer(secureResources); // 提供ServerSocket的功能
new DataXceiverServer(tcpPeerServer, conf, this);
|——利用创建好的tcpPeerServer创建DataXceiverServer
|——this.maxXceiverCount =
conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT);
表示datanode上负责进行文件操作的线程数,默认4096
|——this.estimateBlockSize = conf.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); // Block的大小,默认128M
|——new BlockBalanceThrottler(); // 构建一个平衡节流器
两个参数:第一个参数bandwidth:每个DataNode用来移动数据时,占用带宽的上限,默认10M
第二个参数maxThreads:每台datanode的并行拷贝数,默认50
4.BlockPoolManager