hdfs写流程和MR缓冲区
一.hdfs的写流程
1. 客户端发起RPC请求到NameNode
2. NameNode收到请求之后,进行校验:
a. 校验用户是否有操作权限
b. 校验这个文件是否存在
3. 记录元数据,计算这个文件的块的数量同时分配块的存储地址
4. NameNode就会将地址放入队列中返回给客户端
5. 客户端收到队列之后,会从队列中取出地址来进行写入
6. 在写的时候,客户端会先将数据进行封包(packet),会选取较近的节点写入数据。写完一个Block之后,当前DataNode自动来建立pipeline(管道,底层是基于Channel来实现)将数据备份到其他节点上满足副本策略
7. 客户端收到ACK之后,会继续写入下一个Block
8.等客户端将所有的Block写完之后,会通知NameNode关闭文件(关流)。关流之后这个文件就不能再改动写流程
![在client创建FIleSystem向NameNode,查询元数据信息,
NameNode返回元信息数据:{文件名:数据.txt ;路径:/abc;数据块:2;数据块1:{dn1.1},{dn2.2};数据块2:{dn2.1},{dn3.2}}。
client创建输入流FSInputStream,根据元数据信息下载第一个数据块。。。第二个数据块
关闭输入流
二.mr的shuffle过程
a.Map端的Shuffle
1. MapTask在拿到切片之后,默认会对数据进行按行读取,每读取一行默认调用一次map方法来进行处理
2. 每一个MapTask默认自带一个缓冲区,map方法执行的结果会临时的写到缓冲区中
3. 缓冲区是维系在内存中,默认是100M
4. 当缓冲区达到一定条件的时候,就会将缓冲区中的数据写到磁盘上,这个过程称之为溢写(spill)
5. 溢写之后,map方法产生的结果会继续写到缓冲区中
6. 多次溢写之后,会产生多个溢写文件,MapTask处理完数据之后,会将所有的溢写文件合并成一个文件,这个过程称之为合并(merge),merge过程不会减少数据量。在merge过程中,如果缓冲区中依然有数据,则将缓冲区中的数据一起合并到 最后的结果文件中(final out)
7. 如果没有产生溢写,则最后会把缓冲区中的数据直接写出到最后的结果文件中
8. 结果写到缓冲区中的时候,结果会在缓冲区中进行分区(partition)、排序(sort)。如果在这个过程中指定了Combiner,那么数据在缓冲区中还会进行combine操作 - 这也就意味着数据在缓冲区中是分区且有序的 - 也同样意味着单个溢写 文件中的数据也是有序且分区的 - 如果整体来看,所有的溢写文件应该是局部分区并且局部有序的 - 数据在缓冲区中排序采用的是快速排序
9. 在merge过程中,会对文件再次进行分区并且排序,所以最后的结果文件中是分区且有序的 - merge过程中的排序采用的是归并排序
10. 如果指定了Combiner,如果溢写文件的个数>=3个,那么merge的时候会再进行一次combine过程
11. 注意问题:
a. 缓冲区本质上是一个字节数组
b. 阈值默认是0.8,即当缓冲区的使用量达到80%的时候,这个时候就会进行溢写
c. 缓冲区中的数据大小不代表溢写文件的大小
d. 输入的数据量也不决定溢写次数
e. 缓冲区是环形缓冲区,好处在于可以重复利用这个缓冲区而不用重复寻址
b.Reduce端的Shuffle
1. ReduceTask会启动fetch线程去Map端抓取数据
2. 在抓取数据的时候,会只抓取当前ReduceTask所对应的分区的数据
3. 抓取完数据之后,会对数据进行merge,将所有的数据合并到一个文件中,并且在合并过程中会进行排序 - 采用的排序机制依然是归并排序
4. 将相同的键所对应的值放在同一组中,产生一个针对值的迭代器,这个过程称之为分组(Group)
5. 分组完成之后,每一个键调用一次reduce方法利用reduce方法来进行处理,最后将处理结果写到HDFS上
6. 注意问题:
a. fetch线程数量默认为5个
b. merge因子默认为10,表示每10个文件合并成1个文件
c. ReduceTask的阈值默认为0.05,即5%的MapTask结束之后,ReduceTask就会启动开始抓取数据
三.开发mr过程中遇到的常见问题及解决办法
a. 数据倾斜
b. 小文件过多
c. fetch死锁
d. redude阶段内存溢出–shuffle溢出
1. shuffle使用的内存比例,默认是0.7
2. 单个shuffle任务能使用的内存限额,默认是0.25,低于此值可以输出到内存,否则输出到磁盘。
3. 默认fetch数是5
4. shuffle的数据量默认到Shuffle内存 * 0.9的时候,启动合并。
5. 内存空间分给fetch后,状态变为allocated,commit后变为committed,只有commit状态的内存可以merge。
场景:1.如果前4个fetch已经使用了全部的shuffle内存的89%,第五个fetch如果继续分配最大内存限额0.25,即最大内存使用率114%,溢出
2.如果前4个fetch已经使用了全部的shuffle内存的99%,且已经提交,不能触发合并,第五个fetch如果继续分配最大内存限额0.25,即最大内存使用率124%,溢出
解决办法:
调整对应的参数或者修改底层代码reserve方法,将分配空间设定为小于shuffle内存75%时分配,否则不分配
四.yarn的资源调度
a. ResourceManager:顾名思义资源管理器,主要负责资源管理和调度,ResourceManager主要由两个组件构成:
ApplicationManager,主要负责两类工作:
1. 管理监控各个系统的应用,包括启动Applicaiton Master,监控Applicaiton Master运行状态(为了容错)
2. 跟踪分配给Application Master的进度和状态。
Scheduler
主要负责分配Container给Applicaiton Master,分配算法有多种(如公平调度等等)可以根据需求不同选择适合的调度策略。
b. NodeManager:节点管理器,主要负责维护本节点的资源情况和任务管理。
1. NodeManager需要定期向ResourceManager汇报本节点资源使用情况,以便ResourceManager,根据资源使用情况,来分配资源给Application Master,
2. 需要管理Applicaiton Master提交来的task,比如接收Applicaiton Master 启动或停止task的请求(启动和停止有NodeManager的组件ContainersLanuncher完成)。
c. ApplicaitonMaster:用户提交的每个program都会对应一个ApplicationMaster,主要负责监控应用,任务容错(重启失败的task)等。它同时会和ResourceManager和NodeManager有交互,向ResourceManager申请资源,请求NodeManager启 动或提示task
d. Container:容器是资源调度的单位,它是内存、cpu、磁盘、和IO的集合。Application Master会给task分配Container,task只能只用分配给它的Container的资源。分配流程为Resource Manager ->Application Master -> task
五.mr的执行流程
1. run job:客户端提交一个mr的jar包给JobClient。
a. 做job环境信息的收集,比如各个组件类,输入输出的kv类型等,检测是否合法
b. 检测输入输出的路径是否合法
2. JobClient通过RPC和ResourceManager进行通信,返回一个存放jar包的地址(HDFS)和jobId。jobID是全局唯一的,用于标识该job
3. client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
4. 开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
5. JobTracker进行初始化任务
6. 读取HDFS上的要处理的文件,开始计算输入切片,每一个切片对应一个MapperTask。注意,切片是一个对象,存储的是这个切片的数据描述信息;切块是文件块(数据块),里面存储的是真正的文件数据
7. TaskTracker通过心跳机制领取任务(任务的描述信息)。切片一般和切块是一样的,即在实际开发中,切块和切片认为是相同的。在领取到任务之后,要满足数据本地化策略
8. 下载所需的jar,配置文件等。体现的思想:移动的是运算,而不是数据
9. TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
10. 将结果写入到HDFS当中
六.lock和synchronize的区别
1.公平
2.类型
3.释放锁
4.死锁
七.zab协议
a.原子广播
1. 针对客户端的事务请求,leader服务器会先将该事务写到本地的log文件中
2. 然后,leader服务器会为这次请求生成对应的事务Proposal并且为这个事务Proposal分配一个全局递增的唯一的事务ID,即Zxid
3. leader服务器会为每一个follower服务器都各自分配一个单独的队列,将需要广播的事务Proposal依次放入队列中,发送给每一个follower
4. 每一个follower在收到队列之后,会从队列中依次取出事务Proposal,写道本地的事务日志中。如果写成功了,则给leader返回一个ACK消息
5. 当leader服务器接收到半数的follower的ACK相应之后,就会广播一个Commit消息给所有的follower以通知其进行事务提交,同时leader自身也进行事务提交
6. leader在收到Commit消息后完成事务提交
b.崩溃恢复
1. 当leader服务器出现崩溃、重启等场景,或者因为网络问题导致过半的follower不能与leader服务器保持正常通信的时候,Zookeeper集群就会进入崩溃恢复模式
2. 进入崩溃恢复模式后,只要集群中存在过半的服务器能够彼此正常通信,那么就可以选举产生一个新的leader
3. 每次新选举的leader会自动分配一个全局递增的编号,即epochid
4. 当选举产生了新的leader服务器,同时集群中已经有过半的机器与该leader服务器完成了状态同步之后,ZAB协议就会退出恢复模式。其中,所谓的状态同步是指数据同 步,用来保证集群中存在过半的机器能够和leader服务器的数据状态保持一致
5. 当集群中已经有过半的follower服务器完成了和leader服务器的状态同步,那么整个服务框架就可以进入消息广播模式了
6. 当一台同样遵守ZAB协议的服务器启动后加入到集群中时,如果此时集群中已经存在一个Leader服务器在负责进行消息广播,那么新加入的服务器就会自觉地进入数据恢 复模式:找到leader所在的服务器,并与其进行数据同步,然后一起参与到消息广播流程中
八.hadoop的HA
单点问题namenode
九.写出几个常见的hdfs命令,如何通过指令合并下载
hadoop fs -ls /
hadoop fs -put /
hadoop fs -getmerge /目录1 /目录2
十.mr过程的调优
1. Map阶段的调优:
a. 调大缓冲区,一般可以调为250~350M
b. 可以引入combine过程
c. merge之后的文件可以进行压缩,减少网络传输的消耗
2. Reduce阶段的调优:
a. 增多fetch的线程数
b. 降低ReduceTask的阈值
c. 提高merge因子
3. 数据倾斜的调优
a. 分为治之
b. 小表缓存
4. 业务场景
a. 自定义分区
b. 自定义map输入和reduce输出
c. 自定义combiner
5. 资源的调优
硬件相关