MapReduce理论笔记
一 介绍
MapReduce是一个用于处理海量数据的分布式计算框架这个框架解决了:
(1) 数据分布式存储
(2) 作用调度
(3) 容错
(4) 机器间通信等复杂问题
MapReduce只负责数据计算,不负责存储,数据是存储在HDFS上,因为HDFS:
系统可靠、可扩展、可并发处理
MapReduce 采用多进程的并发方式,优点:多进程的并发方式这种模型便于每个任务占用资源进行控制调配,进程空间是独享的,缺点:多进程这种方式很大一部分限制了那些低延迟的任务,适合用于批量操作,高吞吐离线的。
缺点:
(1) MapReduce过于底层,编写Map,Reduce函数较为困难。
(2) 不是所有算法都能用MapReduce实现。
(3) 不适合实时响应的需求。
二 执行流程
1单机计算
单机简单计算数据流程:
读取数据,对数据进行逻辑处理,write模块对处理后的数据进行输出,固化到本地磁盘上。
2 分布式计算
(1)输入和拆分
Input data:数据输入,都是存储在HDFS中的
InputForput:MR框架基础类之一,进行数据格式化操作,分为两部分:数据分割(Data Split)和记录读取器(Record Reaer)
Data Split:多条记录的集合构成split,每个split包含后一个Block中开头部分(解决记录跨block的问题)
Record Reader:每读取一条记录,调用一次map函数,直到split的尾部
(2)Map映射
把一组键值对映射成一组新的键值对;
每条记录调用执行一次map()函数,就会在内存中增加数据。
(3)Shuffle派发(map到reduce)
Shuffle过程是指Mapper产生的直接输出结果,经过一系列的处理,成为最终的Reducer直接输入数据为止的整个过程。这是mapreduce的核心过程。该过程可以分为两个阶段:
Mapper端的Shuffle:由Mapper产生的结果并不会直接写入到磁盘中,而是先存储在内存中,当内存中的数据量达到设定的阀值时,一次性写入到本地磁盘中(并不是HDFS)。并同时进行sort(排序)、combine(合并)、partition(分桶)等操作。其中,sort 是把 Mapper产生的结果按照key值进行排序;combine是把key值相同的记录进行合并;partition是把数据均衡的分配给 Reducer。
Reducer端的Shuffle:由Mapper和Reducer往往不在同一个节点上运行,所以Reducer需要从多个节点上下载 Mapper的结果数据,并对这些数据进行处理,然后才能被Reducer处理。
(4)Reduce 缩减:
Reducer接收形式的数据流,形成形式的输出,具体的过程可以由用户自定义,多个reduce任务输出的数据都属于不同的partition,因此结果数据的key不会重复,合并reduce的输出文件即可得到最终的结果,最终结果直接写入hdfs,每个 reduce 进程会对应一个输出文件,名称以 part-开头。
4 MapReduce执行流程图
这个图跟上面Hadoop计算流程很相似,这里更细节一点。
Shuffle:Partion, Sort, Spill, Meger, Combiner,Copy, Memery, Disk……(Shuffle上可以做很多性能优化)
MemoryBuffer:每个map的结果和partition处理的结果都保存在缓存中,缓冲区大小:默认100M,溢写阈值:100M * 0.8 = 80M
Sort:缓冲区数据按照key进行排序
Spill:内存缓冲区达到阈值时,溢写spill线程锁住这80M的缓冲区,开始将数据写出到本地磁盘中,然后释放内存。每次溢写都生成一个数据文件。溢出的数据到磁盘前会对数据进行key排序sort以及合并combiner,发送相同Reduce的key数量,会拼接到一起,减少partition的索引数量
Partitioner:决定数据由哪个Reducer处理,从而分区
Combiner:数据合并,相同的key的数据,value值合并,目的有两个:
一是尽量减少每次写入磁盘的数据量;二是尽量减少下一次复制阶段网络传输的数据量。
归并排序:开始把指针都指向各自文件的文件头上,然后开始互相之间比较,就是比如三个文件,每一个文件都有一个指针,然后相当于每一个指针都相当于按照从大到小已经排序好的,那么这个时候去往这个大的数据上开始合并。归并排序的前提是你指的每一个数据的都是先排好序的,这就是归并排序。然后做好归并排序之后产生大的文件也是排好序的。
5 map和reduce的设置
(1) map
一个map任务处理的输入不能跨文件
map任务总数不超过平台可用的任务槽位
map个数为split的份数
压缩文件不可切分
非压缩文件和sequence文件可以切分
dfs.block.size决定block大小
(2) reduce
reduce个数设置:mapred.reduce.tasks,默认为1
reduce个数太少:单次执行慢,出错再试成本高
reduce个数太多:shuffle开销大,输出大量小文件
(3) 集群控制
对单个MapReduce
Map个数最好为集群slot的倍数
Reduce个数最好为集群slot的个数、倍数
多个MapReduce:节奏控制
三 工作原理
两个重要的进程
JobTracker主进程:
- 负责接收客户作业提交,调度任务到作节点上运行
- 监控工作节点状态及任务进度等
- Jobtracker利用一个线程池来同时处理心跳和客户请求
- 一个MapReduce集群只有一个jobtracker。
tasktracker工作节点:
- TaskTracker是具体的job执行者
- 通过周期性(3秒)的心跳来通知jobtracker其当前的健康状态,每一次心跳包含了可用的map和reduce任务数目、占用的数目以及运行中的任务详细信息。
- 在每一个工作节点上永远只会有一个tasktracker,但集群中会有多个tasktracker。
在一个TaskTracker中,从JobTracker获取的job会被分成Map task 和Reduce task,分别由Mapper 和 Reducer 来执行。
四 工作流程
当用户提交一个MapReduce作业时:
1) 通过客户端节点(client node)提交MapReduce任务请求。
2) Jobtracker接受到任务后,将任务拆分,调度给空闲的tasktracker,tasktracker在执行任务时,会对Jobtracker返回进度报告,Jobtracker则会记录任务进行情况,一旦出现某个tasktracker任务执行失败,Jobtracker则会把任务分配给另一台tasktracker,直到任务完成为止。
3) 对于每一个job, tasktracker会将该任务分为Map task 和Reduce task 分别执行。Map task会在tasktracker的Map槽点执行,结果的汇总计算工作会在 tasktracker 的 Reduce槽点执行,最终返回Reduce task的结果。
数据本地化:
一般地,将 NameNode 和 JobTracker 部署到同一台机器上,各个 DataNode 和 TaskNode 也同样部署到同一台机器上, 这样做的目的是将 map 任务分配给含有该 map 处理的数据块的 TaskTracker 上,同时将程序 JAR 包复制到该 TaskTracker 上来运行,这叫“运算移动,数据不移动”。而分配reduce 任务时并不考虑数据本地化。
多副本,目的是容错,数据层面做到高可用
错误处理:
TaskTracker心跳回应失败问题:
如果此时执行到Map任务,交由其他TaskTracker节点重新执行该任务,如果此时执行到Reduce任务,交由其他TaskTracker重新执行Reduce任务,不用再从map任务开始;
TaskTracker自身执行任务失败:
重新向JobTracker申请新任务,这样的失败次数有4次(可设置),再之后就整个作用也就失败了。
五 streaming开发
streaming原理
1.Strieaming的优点:
1) 开发效率高
– 只需按照一定的格式从标准输入读取数据、向标准输出写数据就可以
– 容易单机调试:cat input | mapper | sort | reducer > output
2) 程序运行效率高
– 对于CPU密集的计算,有些语言如C/C++编写的程序可能比用Java效率更高一些
3) 便于平台进行资源控制
– Streaming框架中通过limit等方式可以灵活地限制应用程序使用的内存等资源
2.Streaming的局限:
1) Streaming默认只能处理文本数据
2) 两次数据拷贝和解析(分割),带来一定的开销
3.Streaming的开发要点:
1) 分发:
-file :把本地文件分发到各个节点
-cachefile:把hdfs的压缩文件分发到各个节点
-archivefile:把hdfs的目录分发到各个节点
2) jobconf:
提交作业的一些配置属性,常见配置:
(1)mapred.map.tasks:map task数目
(2)mapred.reduce.tasks:reduce task数目
(3)stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
(4)num.key.fields.for.partition:指定对key分出来的前几部分做partition,而非整个key
(5)mapred.compress.map.output:map的输出是否压缩
(6)mapred.map.output.compression.codec:map的输出压缩方式
(7)mapred.output.compress:reduce的输出是否压缩
(8)mapred.output.compression.codec:reduce的输出压缩方式
其他设置
minSize:splitSize 的最小值,由 mapred-site.xml 配置文件中
mapred.min.split.size 参数确定。
maxSize:splitSize 的最大值,由 mapred-site.xml 配置文件中
mapreduce.jobtracker.split.metainfo.maxsize 参数确定。
blockSize:HDFS 中文件存储的快大小,由 hdfs-site.xml 配置文件中
dfs.block.size 参数确定。
splitSize 的确定规则:splitSize=max{minSize,min{maxSize,blockSize}}
io.sort.mb 缓冲区大小设置
io.sort.spill.percent 默认缓冲区阈值设置
mapred.submit.replication JAR默认文件副本