大数据之路(二)——MapReduce流程详细分析
最近有个任务就是处理上百G的日志文件,为了效率我们首先想到的是用hadoop,而hadoop框架中最重要的一
部分就是MapReduce,下面个人总结下MapReduce的流程:
1、MapRuduce File要处理得文件:File存储在HDFS上,切分成默认64M的Block,存储在多个DataNode节点上
2、MapReduce InputFormat:数据格式定义,例如以\n分割记录,“空格”分割单词
3、MapReduce Split:map函数的输入逻辑概念,一个inputSplit和Block是“1对1”或者“1对多”的关系
4、MapReduce RecordReader:每读取一条记录,调用一次map函数
5、MapReduce Map:执行map函数,在内存中增加数据
6、MapReduce Shuffle:Partion,Sort,Spill,Merge, Combiner,Copy,Memory,Disk……性能优化的大
有可为的地方
6.1、Partitioner:决定数据由哪个Reducer进行处理(例如用hash方法)
6.2、MemoryBuffer:内存缓冲区 三元组数据{"partion" "key" "value"}形式存储 缓冲区大小默
认100M,溢出阀值80M
6.3、Spill:内存缓冲超过80M时,spill线程锁住这80M内容,将数据写到本地磁盘,然后释放内存,
数据溢出前会先进行Sort、Combiner并发送到相同Reduce的key数据,这样可减少partitioner索引量
6.4、Sort:缓冲区按照key进行排序
6.5、Combiner:数据合并,将相同的key value值合并,combine函数其实就是reduce函数
6.6、Spill to Disk:溢写到硬盘 内存缓冲区每次数据溢写都会生成一个溢写文件,将相同partition
的数据存储到一块
6.7、Merge on disk:map后合并溢写文件
7、MapReduce Map Task:Map任务上的输出时按照partitiion和key排序的数据存储文件,一般有多个map task,
每个task有一个输出文件{key1 :[value1,value2...]} {key2 :[value1,value2...]}...
8、MapReduce Copy:将map端的输出文件按照相应的partition,copy到reduce端
9、MapReduce Spill:和Map端的Spill一样
10、MapReduce Sort:以相同的key值为参照排序
11、MapReduce combiner:value值合并
12、MapReduce merge:合并溢出文件
13、reduce函数:function reduce
附加一张官方的shuffle流程图;
下面凭借我自己的理解写一下具体数据在MapReduce过程中的形式变化:
样例数据:this is a hello hello world
a b a
b c b
1、数据被inputformat分隔为"this is a hello hello world",value1 "a b a ",value2 "b c b ",value3
(注:数据以split形式存储在Block中)
Map开始
2、map(value1) map(value2) map(value3) ----------> {"this",1}{"is",1}{"a",1}{"hello",1}.....{"c",1}{"b",1}
3、partion每个{key value},例如{"this",1} 让this对reduce个数n取模得到m,则{"this",1}----------->{m,"this",1}
4、按照key值进行sort,例如:{1,"a",1}{1,"b",1}{1,"a",1}------->{1,"a",1}{1,"a",1}{1,"b",1}
5、combine {1,"a",1}{1,"a",1}{1,"b",1} --------> {1,"a",2}{1,"b",1}
merge 如果没有设置上述combiner 则 {1,"a",1}{1,"a",1}{1,"b",1} ------> {1,"a",[1,1]}{1,"b",[1]}
6、map输出{1,"a",[1,1]}{1,"b",1}... ,这些数据被copy到1号reduce(因为partition的值为1)
(注:以上的输出数据没有经过combine)
Reduce开始
7、sort{1,"a",[1,1]}{1,"b",[1]}{1,"c",[1]}{1,"b",[1,1]}---------> {1,"a",[1,1]}{1,"b",[1]}{1,"b",[1,1]}{1,"c",[1]}
8、combine {1,"a",[1,1]}{1,"b",[1]}{1,"b",[1,1]}{1,"c",[1]}--------> {1,"a",[2]}{1,"b",[1]}{1,"b",[2]}{1,"c",[1]}
9、merge {1,"a",[2]}{1,"b",[1]}{1,"b",[2]}{1,"c",[1]}------>{1,"a",[2]}{1,"b",[1,2]}{1,"c",[1]}
10、reduce {1,"a",[2]}{1,"b",[1,2]}{1,"c",[1]}------->{1,"a",[2]}{1,"b",[3]}{1,"c",[1]}
即完成了一个wordcount程序,统计出单词个数a:2 b:3 c:1
(样例数据第一行不具代表性,博主悄悄地把这行数据的处理忽视了......)
其实整个MapReduce流程的最最神奇的地方在shuffle,因为整个MR程序的执行效率全在这个里面进行优化,而平时
我们仅仅是通过split或者string的连接来编写一些简单的map、reduce函数,殊不知得shuffle者得MapReduce,得
MapReduce者得hadoop,得hadoop者得大数据云计算的天下......播主有点神经了......以上分析仅仅是为了梳理一
下自己对mapreduce流程的理解(其实我还是比较凌乱的),并没有上代码,以后慢慢的分析源码并总结吧。
文献参考来源:http://www.slideshare.net/snakebbf/hadoop-mapreduce-12716482
(每次看这个都感触颇深啊)
MR优化博客:http://hongweiyi.com/2012/09/mapred-optimize-writable/
http://langyu.iteye.com/blog/1341267
shuffle分析博客:http://langyu.iteye.com/blog/992916