Hadoop-MapReduce

一、概述:

  • 以WordCount为例,描述MapReduce的执行过程。
  • 特记(原语):相同的key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算

二、过程(3个map、4个reduce): (红色虚线框框属于reduce流程,左边属于map流程。绿色框框属于Shuffle:描述着数据从map task输出reduce task输入的这段过程)

Hadoop-MapReduce

1、在HDFS中获取数据,假设数据有分为3个block。

2、每个输入分片会默认让一个map任务来处理(2),默认情况下,以HDFS的一个块的大小(默认128M)为一个分片,当然我们也可以设置块的大小。map输出的结果会暂且放在一个环形内存缓冲区中(3)(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件(4)

  • 溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。比例默认是0.8,也就是当缓冲区的数据值已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。map task的输出结果还可以往剩下的20MB内存中写,互不影响。
  • 当溢写线程启动后,需要对这80MB空间内的key做排序(sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序

3、在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的数据进行排序,如果此时设置了Combiner,将排序后的结果进行Combianer操作,这样做的目的是让尽可能少的数据写入到磁盘。

4、当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并(5)。合并的过程中会不断地进行排序和combiner操作,目的有两个:

  • 尽量减少每次写入磁盘的数据量;
  • 尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件,等待reduce task来拉数据。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以。。

5、Reduce进程启动一些数据copy线程(Fetcher),通过http方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。

6、reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。并对数据进行merge归并(6),如果reduce端接收的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merg.percent决定),则对数据合并后溢写到磁盘中。 

  • 随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省空间。reduce没有排序的能力,只有归并的能力,强依赖map的排序,合并操作。

8、合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。
9、Reducer的输入文件。不断地merge后,最后会生成一个“最终文件”(。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDSF上(8)。 

三、注意事项

  • map的数量由切片数决定,切片是逻辑性的。
  • HDFS中一个block可以设置多个分片,多个block也可以设置为1个分片。默认情况下是一个block对应一个map。
  • 切片调用map,以1条记录调用一次map,map输出K、V格式的数据比如:("男":1)("女":1)。
  • 4、原语:相同的key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算

Map -> Reduce

  • N:1 多个map对应一个reduce (select count(*) from user)
  • N:N 多个map对应多个reduce (select count(sex) from user group by sex) 分组
  • 1:1 1个map对应一个reduce (select count(*) from user) 数据量很小的只有一个切片的时候
  • 1:N 1个map对应多个reduce 数据量太小了,只有一个map。这个map里有10条记录,但读进来一条记录,map会输出10000条,10000条里面又有两组。所以可以有2个reduce。

group(key) -> reduce

  • 一个reduce就是一个分区:
  • 1:1 1组对应一个reduce,把男放在一个分区里。
  • N:1 多组对应一个reduce,把男,女放在一个分区里。
  • N:N 多组对应多个reduce,把男,女放在多个分区里。
  • 1:N 1组数据对一多个reduce,不可以。违背了原语。