从hadoop框架与MapReduce模式中谈海量数据处理

废话不说直接来一张图如下:

从hadoop框架与MapReduce模式中谈海量数据处理

从JVM的角度看Map和Reduce

Map阶段包括:

第一读数据:从HDFS读取数据

1、问题:读取数据产生多少个Mapper??

    Mapper数据过大的话,会产生大量的小文件,由于Mapper是基于虚拟机的,过多的Mapper创建和初始化及关闭虚拟机都会消耗大量的硬件资源;

    Mapper数太小,并发度过小,Job执行时间过长,无法充分利用分布式硬件资源;

2Mapper数量由什么决定?? 

  1输入文件数目

  2输入文件的大小

  3配置参数

这三个因素决定的。

涉及参数:

    mapreduce.input.fileinputformat.split.minsize //启动map最小的split size大小,默认0
    mapreduce.input.fileinputformat.split.maxsize //启动map最大的split size大小,默认256M
    dfs.block.size//block块大小,默认64M
    计算公式:splitSize =  Math.max(minSize, Math.min(maxSize, blockSize));

    例如默认情况下:例如一个文件800M,Block大小是128M,那么Mapper数目就是7个。6个Mapper处理的数据是128M,1个Mapper处理的数据是32M;

再例如一个目录下有三个文件大小分别为:5M10M 150M 这个时候其实会产生四个Mapper处理的数据分别是5M,10M,128M,22M。

Mapper是基于文件自动产生的,如果想要自己控制Mapper的个数???

就如上面,5M,10M的数据很快处理完了,128M要很长时间;这个就需要通过参数的控制来调节Mapper的个数。

减少Mapper的个数的话,就要合并小文件,这种小文件有可能是直接来自于数据源的小文件,也可能是Reduce产生的小文件。

设置合并器:(set都是在hive脚本,也可以配置Hadoop)

    设置合并器本身:

    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

    set hive.merge.mapFiles=true;

    set hive.merge.mapredFiles=true;

    set hive.merge.size.per.task=256000000;//每个Mapper要处理的数据,就把上面的5M10M……合并成为一个

一般还要配合一个参数:

    set mapred.max.split.size=256000000 // mapred切分的大小

    set mapred.min.split.size.per.node=128000000//低于128M就算小文件,数据在一个节点会合并,在多个不同的节点会把数据抓过来进行合并。

Hadoop中的参数:可以通过控制文件的数量控制mapper数量

    mapreduce.input.fileinputformat.split.minsizedefault0),小于这个值会合并

    mapreduce.input.fileinputformat.split.maxsize 大于这个值会切分

第二处理数据:

Partition说明

对于map输出的每一个键值对,系统都会给定一个partition,partition值默认是通过计算key的hash值后对Reduce task的数量取模获得。如果一个键值对的partition值为1,意味着这个键值对会交给第一个Reducer处理。

自定义partitioner的情况:

    1、我们知道每一个Reduce的输出都是有序的,但是将所有Reduce的输出合并到一起却并非是全局有序的,如果要做到全局有序,我们该怎么做呢?最简单的方式,只设置一个Reduce task,但是这样完全发挥不出集群的优势,而且能应对的数据量也很受限。最佳的方式是自己定义一个Partitioner,用输入数据的最大值除以系统Reduce task数量的商作为分割边界,也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行partition后的数据是整体有序的。

   2、解决数据倾斜:另一种需要我们自己定义一个Partitioner的情况是各个Reduce task处理的键值对数量极不平衡。对于某些数据集,由于很多不同的key的hash值都一样,导致这些键值对都被分给同一个Reducer处理,而其他的Reducer处理的键值对很少,从而拖延整个任务的进度。当然,编写自己的Partitioner必须要保证具有相同key值的键值对分发到同一个Reducer。

   3、自定义的Key包含了好几个字段,比如自定义key是一个对象,包括type1,type2,type3,只需要根据type1去分发数据,其他字段用作二次排序。

环形缓冲区

    Map的输出结果是由collector处理的,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。

    这个数据结构其实就是个字节数组,叫Kvbuffer,名如其义,但是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长Kvbuffer的存放指针bufindex是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。

    索引是对在kvbuffer中的键值对的索引,是个四元组,包括:value的起始位置、key的起始位置、partition值、value的长度,占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4,当第一个键值对写完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的长度,然后Kvindex跳到-8位置,等第二个键值对和索引写完之后,Kvindex跳到-12位置。

第三写数据到磁盘

    Mapper中的Kvbuffer的大小默认100M,可以通过mapreduce.task.io.sort.mbdefault100参数来调整。可以根据不同的硬件尤其是内存的大小来调整,调大的话,会减少磁盘spill的次数此时如果内存足够的话,一般都会显著提升性能。spill一般会在Buffer空间大小的80%开始进行spill(因为spill的时候还有可能别的线程在往里写数据,因为还预留空间,有可能有正在写到Buffer中的数据),可以通过mapreduce.map.sort.spill.percentdefault0.80进行调整,Map Task在计算的时候会不断产生很多spill文件,在Map Task结束前会对这些spill文件进行合并,这个过程就是merge的过程。mapreduce.task.io.sort.factordefault10,代表进行merge的时候最多能同时merge多少spill,如果有100个spill个文件,此时就无法一次完成整个merge的过程,这个时候需要调大mapreduce.task.io.sort.factordefault10)来减少merge的次数,从而减少磁盘的操作;

Spill这个重要的过程是由Spill线程承担,Spill线程从Map任务接到“命令”之后就开始正式干活,干的活叫SortAndSpill,原来不仅仅是Spill,在Spill之前还有个颇具争议性的Sort。

    Combiner存在的时候,此时会根据Combiner定义的函数对map的结果进行合并,什么时候进行Combiner操作呢???和Map在一个JVM中,是由min.num.spill.for.combine的参数决定的,默认是3,也就是说spill的文件数在默认情况下由三个的时候就要进行combine操作,最终减少磁盘数据;

减少磁盘IO和网络IO还可以进行:压缩,对spill,merge文件都可以进行压缩。中间结果非常的大,IO成为瓶颈的时候压缩就非常有用,可以通过mapreduce.map.output.compressdefaultfalse设置为true进行压缩,数据会被压缩写入磁盘,读数据读的是压缩数据需要解压,在实际经验中Hive在Hadoop的运行的瓶颈一般都是IO而不是CPU,压缩一般可以10倍的减少IO操作,压缩的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一种比较平衡选择,mapreduce.map.output.compress.codecdefaultorg.apache.hadoop.io.compress.DefaultCodec参数设置。但这个过程会消耗CPU,适合IO瓶颈比较大。

Shuffle和Reduce阶段包括:

一、Copy

    1、由于job的每一个map都会根据reduce(n)数将数据分成map 输出结果分成n个partition,所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的。所以,为了优化reduce的执行时间,hadoop中是等job的第一个map结束后,所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据,因此map和reduce是交叉进行的,其实就是shuffle。Reduce任务通过HTTP向各个Map任务拖取(下载)它所需要的数据(网络传输),Reducer是如何知道要去哪些机器取数据呢?一旦map任务完成之后,就会通过常规心跳通知应用程序的Application Master。reduce的一个线程会周期性地向master询问,直到提取完所有数据(如何知道提取完?)数据被reduce提走之后,map机器不会立刻删除数据,这是为了预防reduce任务失败需要重做。因此map输出数据是在整个作业完成之后才被删除掉的。

    2、reduce进程启动数据copy线程(Fetcher),通过HTTP方式请求maptask所在的TaskTracker获取maptask的输出文件。由于map通常有许多个,所以对一个reduce来说,下载也可以是并行的从多个map下载,那到底同时到多少个Mapper下载数据??这个并行度是可以通过mapreduce.reduce.shuffle.parallelcopies(default5调整。默认情况下,每个Reducer只会有5个map端并行的下载线程在从map下数据,如果一个时间段内job完成的map有100个或者更多,那么reduce也最多只能同时下载5个map的数据,所以这个参数比较适合map很多并且完成的比较快的job的情况下调大,有利于reduce更快的获取属于自己部分的数据。 在Reducer内存和网络都比较好的情况下,可以调大该参数;

    3、reduce的每一个下载线程在下载某个map数据的时候,有可能因为那个map中间结果所在机器发生错误,或者中间结果的文件丢失,或者网络瞬断等等情况,这样reduce的下载就有可能失败,所以reduce的下载线程并不会无休止的等待下去,当一定时间后下载仍然失败,那么下载线程就会放弃这次下载,并在随后尝试从另外的地方下载(因为这段时间map可能重跑)。reduce下载线程的这个最大的下载时间段是可以通过mapreduce.reduce.shuffle.read.timeoutdefault180000秒)调整的。如果集群环境的网络本身是瓶颈,那么用户可以通过调大这个参数来避免reduce下载线程被误判为失败的情况。一般情况下都会调大这个参数,这是企业级最佳实战。

二、MergeSort

     1、这里的merge和map端的merge动作类似,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,然后当使用内存达到一定量的时候才spill磁盘。这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置。这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了,而是通过另外一个参数 mapreduce.reduce.shuffle.input.buffer.percentdefault 0.7f 源码里面写死了) 来设置,这个参数其实是一个百分比,意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task。JVM的heapsize的70%。内存到磁盘merge的启动门限可以通过mapreduce.reduce.shuffle.merge.percent(default0.66)配置。也就是说,如果该reduce task的最大heap使用量(通常通过mapreduce.admin.reduce.child.java.opts来设置,比如设置为-Xmx1024m)的一定比例用来缓存数据。默认情况下,reduce会使用其heapsize的70%来在内存中缓存数据。假设 mapreduce.reduce.shuffle.input.buffer.percent 为0.7,reducetask的max heapsize为1G,那么用来做下载数据缓存的内存就为大概700MB左右。这700M的内存,跟map端一样,也不是要等到全部写满才会往磁盘刷的,而是当这700M中被使用到了一定的限度(通常是一个百分比),就会开始往磁盘刷(刷磁盘前会先做sortMerge)。这个限度阈值也是可以通过参数 mapreduce.reduce.shuffle.merge.percent(default0.66)来设定。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。这种merge方式一直在运行,直到没有map端的数据时才结束,然后启动磁盘到磁盘的merge方式生成最终的那个文件。

    这里需要强调的是,merge有三种形式:1)内存到内存(memToMemMerger)2)内存中Merge(inMemoryMerger)3)磁盘上的Merge(onDiskMerger)具体包括两个:(一)Copy过程中磁盘合并(二)磁盘到磁盘。

    (1)内存到内存Merge(memToMemMerger)    Hadoop定义了一种MemToMem合并,这种合并将内存中的map输出合并,然后再写入内存。这种合并默认关闭,可以通过mapreduce.reduce.merge.memtomem.enabled(default:false)

打开,当map输出文件达到mapreduce.reduce.merge.memtomem.threshold时,触发这种合并。

    (2)内存中Merge(inMemoryMerger):当缓冲中数据达到配置的阈值时,这些数据在内存中被合并、写入机器磁盘。阈值有2种配置方式:

        配置内存比例:前面提到reduceJVM堆内存的一部分用于存放来自map任务的输入,在这基础之上配置一个开始合并数据的比例。假设用于存放map输出的内存为500M,mapreduce.reduce.shuffle.merge.percent配置为0.66,则当内存中的数据达到330M的时候,会触发合并写入。

   配置map输出数量: 通过mapreduce.reduce.merge.inmem.threshold配置。在合并的过程中,会对被合并的文件做全局的排序。如果作业配置了Combiner,则会运行combine函数,减少写入磁盘的数据量。

    (3)磁盘上的Merge(onDiskMerger):

            (3.1)Copy过程中磁盘Merge:在copy过来的数据不断写入磁盘的过程中,一个后台线程会把这些文件合并为更大的、有序的文件。如果map的输出结果进行了压缩,则在合并过程中,需要在内存中解压后才能给进行合并。这里的合并只是为了减少最终合并的工作量,也就是在map输出还在拷贝时,就开始进行一部分合并工作。合并的过程一样会进行全局排序。

            (3.2)最终磁盘中Merge:当所有map输出都拷贝完毕之后,所有数据被最后合并成一个整体有序的文件,作为reduce任务的输入。这个合并过程是一轮一轮进行的,最后一轮的合并结果直接推送给reduce作为输入,节省了磁盘操作的一个来回。最后(所以map输出都拷贝到reduce之后)进行合并的map输出可能来自合并后写入磁盘的文件,也可能来及内存缓冲,在最后写入内存的map输出可能没有达到阈值触发合并,所以还留在内存中。

   每一轮合并不一定合并平均数量的文件数,指导原则是使用整个合并过程中写入磁盘的数据量最小,为了达到这个目的,则需要最终的一轮合并中合并尽可能多的数据,因为最后一轮的数据直接作为reduce的输入,无需写入磁盘再读出。因此我们让最终的一轮合并的文件数达到最大,即合并因子的值,通过mapreduce.task.io.sort.factordefault10来配置。

如上图:Reduce阶段中一个Reduce过程 可能的合并方式为:假设现在有20个map输出文件,合并因子配置为5,则需要4轮的合并。最终的一轮确保合并5个文件,其中包括2个来自前2轮的合并结果,因此原始的20个中,再留出3个给最终一轮。

三、Reduce函数调用(用户自定义业务逻辑)

    1、当reduce将所有的map上对应自己partition的数据下载完成后,就会开始真正的reduce计算阶段。reducetask真正进入reduce函数的计算阶段,由于reduce计算时肯定也是需要消耗内存的,而在读取reduce需要的数据时,同样是需要内存作为buffer,这个参数是控制,reducer需要多少的内存百分比来作为reduce读已经sort好的数据的buffer大小??默认用多大内存呢??默认情况下为0,也就是说,默认情况下,reduce是全部从磁盘开始读处理数据。可以用mapreduce.reduce.input.buffer.percentdefault 0.0)(源代码MergeManagerImpl.java:674行)来设置reduce的缓存。如果这个参数大于0,那么就会有一定量的数据被缓存在内存并输送给reduce,当reduce计算逻辑消耗内存很小时,可以分一部分内存用来缓存数据,可以提升计算的速度。所以默认情况下都是从磁盘读取数据,如果内存足够大的话,务必设置该参数让reduce直接从缓存读数据,这样做就有点Spark Cache的感觉;

    2、Reduce在这个阶段,框架为已分组的输入数据中的每个 <key, (list of values)>对调用一次 reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法。Reduce任务的输出通常是通过调用 OutputCollector.collect(WritableComparable,Writable)写入文件系统的。Reducer的输出是没有排序的。

性能调优

    如果能够根据情况对shuffle过程进行调优,对于提供MapReduce性能很有帮助。相关的参数配置列在后面的表格中。

一个通用的原则是给shuffle过程分配尽可能大的内存,当然你需要确保map和reduce有足够的内存来运行业务逻辑。因此在实现Mapper和Reducer时,应该尽量减少内存的使用,例如避免在Map中不断地叠加。

运行map和reduce任务的JVM,内存通过mapred.child.java.opts属性来设置,尽可能设大内存。容器的内存大小通过mapreduce.map.memory.mbmapreduce.reduce.memory.mb来设置,默认都是1024M

map优化

    在map端,避免写入多个spill文件可能达到最好的性能,一个spill文件是最好的。通过估计map的输出大小,设置合理的mapreduce.task.io.sort.*属性,使得spill文件数量最小。例如尽可能调大mapreduce.task.io.sort.mb。

map端相关的属性如下表:

从hadoop框架与MapReduce模式中谈海量数据处理

reduce优化

    在reduce端,如果能够让所有数据都保存在内存中,可以达到最佳的性能。通常情况下,内存都保留给reduce函数,但是如果reduce函数对内存需求不是很高,将mapreduce.reduce.merge.inmem.threshold(触发合并的map输出文件数)设为0,mapreduce.reduce.input.buffer.percent(用于保存map输出文件的堆内存比例)设为1.0,可以达到很好的性能提升。在2008年的TB级别数据排序性能测试中,Hadoop就是通过将reduce的中间数据都保存在内存中胜利的。

reduce端相关属性:

从hadoop框架与MapReduce模式中谈海量数据处理

通用优化

Hadoop默认使用4KB作为缓冲,这个算是很小的,可以通过io.file.buffer.size来调高缓冲池大小。


   从hadoop框架与MapReduce模式中谈海量数据处理

前言

    几周前,当我最初听到,以致后来初次接触Hadoop与MapReduce这两个东西,我便稍显兴奋,觉得它们很是神秘,而神秘的东西常能勾起我的兴趣,在看过介绍它们的文章或论文之后,觉得Hadoop是一项富有趣味和挑战性的技术,且它还牵扯到了一个我更加感兴趣的话题:海量数据处理。

    由此,最近凡是空闲时,便在看“Hadoop”,“MapReduce”“海量数据处理”这方面的论文。但在看论文的过程中,总觉得那些论文都是浅尝辄止,常常看的很不过瘾,总是一个东西刚要讲到紧要处,它便结束了,让我好生“愤懑”。

    尽管我对这个Hadoop与MapReduce知之甚浅,但我还是想记录自己的学习过程,说不定,关于这个东西的学习能督促我最终写成和“经典算法研究系列”一般的一系列文章。

    Ok,闲话少说。本文从最基本的mapreduce模式,Hadoop框架开始谈起,然后由各自的架构引申开来,谈到海量数据处理,最后谈谈淘宝的海量数据产品技术架构,以为了兼备浅出与深入之效,最终,希望得到读者的喜欢与支持。谢谢。

    由于本人是初次接触这两项技术,文章有任何问题,欢迎不吝指正。再谢一次。Ok,咱们开始吧。

第一部分、mapreduce模式与hadoop框架深入浅出

架构扼要

         想读懂此文,读者必须先要明确以下几点,以作为阅读后续内容的基础知识储备:

  1. Mapreduce是一种模式。
  2. Hadoop是一种框架。
  3. Hadoop是一个实现了mapreduce模式的开源的分布式并行编程框架。

    所以,你现在,知道了什么是mapreduce,什么是hadoop,以及这两者之间最简单的联系,而本文的主旨即是,一句话概括:在hadoop的框架上采取mapreduce的模式处理海量数据。下面,咱们可以依次深入学习和了解mapreduce和hadoop这两个东西了。

Mapreduce模式

    前面说了,mapreduce是一种模式,一种什么模式呢?一种云计算的核心计算模式,一种分布式运算技术,也是简化的分布式编程模式,它主要用于解决问题的程序开发模型,也是开发人员拆解问题的方法。

    Ok,光说不上图,没用。如下图所示,mapreduce模式的主要思想是将自动分割要执行的问题(例如程序)拆解成map(映射)和reduce(化简)的方式,流程图如下图1所示:

从hadoop框架与MapReduce模式中谈海量数据处理

    在数据被分割后通过Map 函数的程序将数据映射成不同的区块,分配给计算机机群处理达到分布式运算的效果,在通过Reduce 函数的程序将结果汇整,从而输出开发者需要的结果。

    MapReduce 借鉴了函数式程序设计语言的设计思想,其软件实现是指定一个Map 函数,把键值对(key/value)映射成新的键值对(key/value),形成一系列中间结果形式的key/value 对,然后把它们传给Reduce(规约)函数,把具有相同中间形式key 的value 合并在一起。Map 和Reduce 函数具有一定的关联性。函数描述如表1 所示:

从hadoop框架与MapReduce模式中谈海量数据处理

    MapReduce致力于解决大规模数据处理的问题,因此在设计之初就考虑了数据的局部性原理,利用局部性原理将整个问题分而治之。MapReduce集群由普通PC机构成,为无共享式架构。在处理之前,将数据集分布至各个节点。处理时,每个节点就近读取本地存储的数据处理(map),将处理后的数据进行合并(combine)、排序(shuffle and sort)后再分发(至reduce节点),避免了大量数据的传输,提高了处理效率。无共享式架构的另一个好处是配合复制(replication)策略,集群可以具有良好的容错性,一部分节点的down机对集群的正常工作不会造成影响。

    ok,你可以再简单看看下副图,整幅图是有关hadoop的作业调优参数及原理,图的左边是MapTask运行示意图,右边是ReduceTask运行示意图:

从hadoop框架与MapReduce模式中谈海量数据处理

    如上图所示,其中map阶段,当map task开始运算,并产生中间数据后并非直接而简单的写入磁盘,它首先利用内存buffer来对已经产生的buffer进行缓存,并在内存buffer中进行一些预排序来优化整个map的性能。而上图右边的reduce阶段则经历了三个阶段,分别Copy->Sort->reduce。我们能明显的看出,其中的Sort是采用的归并排序,即merge sort。

    了解了什么是mapreduce,接下来,咱们可以来了解实现了mapreduce模式的开源框架—hadoop。

Hadoop框架

    前面说了,hadoop是一个框架,一个什么样的框架呢?Hadoop 是一个实现了MapReduce 计算模型的开源分布式并行编程框架,程序员可以借助Hadoop 编写程序,将所编写的程序运行于计算机机群上,从而实现对海量数据的处理。

    此外,Hadoop 还提供一个分布式文件系统(HDFS)及分布式数据库(HBase)用来将数据存储或部署到各个计算节点上。所以,你可以大致认为:Hadoop=HDFS(文件系统,数据存储技术相关)+HBase(数据库)+MapReduce(数据处理)。Hadoop 框架如图2 所示:

从hadoop框架与MapReduce模式中谈海量数据处理

    借助Hadoop 框架及云计算核心技术MapReduce 来实现数据的计算和存储,并且将HDFS 分布式文件系统和HBase 分布式数据库很好的融入到云计算框架中,从而实现云计算的分布式、并行计算和存储,并且得以实现很好的处理大规模数据的能力。

Hadoop的组成部分

    我们已经知道,Hadoop是Google的MapReduce一个Java实现。MapReduce是一种简化的分布式编程模式,让程序自动分布到一个由普通机器组成的超大集群上并发执行。Hadoop主要由HDFS、MapReduce和HBase等组成。具体的hadoop的组成如下图:

从hadoop框架与MapReduce模式中谈海量数据处理

    由上图,我们可以看到:

    1、             Hadoop HDFS是Google GFS存储系统的开源实现,主要应用场景是作为并行计算环境(MapReduce)的基础组件,同时也是BigTable(如HBase、HyperTable)的底层分布式文件系统。HDFS采用master/slave架构。一个HDFS集群是有由一个Namenode和一定数目的Datanode组成。Namenode是一个中心服务器,负责管理文件系统的namespace和客户端对文件的访问。Datanode在集群中一般是一个节点一个,负责管理节点上它们附带的存储。在内部,一个文件其实分成一个或多个block,这些block存储在Datanode集合里。如下图所示(HDFS体系结构图):

从hadoop框架与MapReduce模式中谈海量数据处理

    2、             Hadoop MapReduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上TB级别的数据集。

    一个MapReduce作业(job)通常会把输入的数据集切分为若干独立的数据块,由 Map任务(task)以完全并行的方式处理它们。框架会对Map的输出先进行排序,然后把结果输入给Reduce任务。通常作业的输入和输出都会被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的任务。如下图所示(Hadoop MapReduce处理流程图):

从hadoop框架与MapReduce模式中谈海量数据处理

    3、             Hive是基于Hadoop的一个数据仓库工具,处理能力强而且成本低廉。

主要特点

存储方式是将结构化的数据文件映射为一张数据库表。提供类SQL语言,实现完整的SQL查询功能。可以将SQL语句转换为MapReduce任务运行,十分适合数据仓库的统计分析。

不足之处:

采用行存储的方式(SequenceFile)来存储和读取数据。效率低:当要读取数据表某一列数据时需要先取出所有数据然后再提取出某一列的数据,效率很低。同时,它还占用较多的磁盘空间。

由于以上的不足,有人(查礼博士)介绍了一种将分布式数据处理系统中以记录为单位的存储结构变为以列为单位的存储结构,进而减少磁盘访问数量,提高查询处理性能。这样,由于相同属性值具有相同数据类型和相近的数据特性,以属性值为单位进行压缩存储的压缩比更高,能节省更多的存储空间。如下图所示(行列存储的比较图):

从hadoop框架与MapReduce模式中谈海量数据处理

4、             HBase

    HBase是一个分布式的、面向列的开源数据库,它不同于一般的关系数据库,是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。HBase使用和 BigTable非常相同的数据模型。用户存储数据行在一个表里。一个数据行拥有一个可选择的键和任意数量的列,一个或多个列组成一个ColumnFamily,一个Fmaily下的列位于一个HFile中,易于缓存数据。表是疏松的存储的,因此用户可以给行定义各种不同的列。在HBase中数据按主键排序,同时表按主键划分为多个HRegion,如下图所示(HBase数据表结构图):

从hadoop框架与MapReduce模式中谈海量数据处理

    Ok,行文至此,看似洋洋洒洒近千里,但若给读者造成阅读上的负担,则不是我本意。接下来的内容,我不会再引用诸多繁杂的专业术语,以给读者心里上造成不良影响。

    我再给出一副图,算是对上文所说的hadoop框架及其组成部分做个总结,如下图所示,便是hadoop的内部结构,我们可以看到,海量的数据交给hadoop处理后,在hadoop的内部中,正如上文所述:hadoop提供一个分布式文件系统(HDFS)及分布式数据库(Hbase)用来存储或部署到各个计算点上,最终在内部采取mapreduce的模式对其数据进行处理,然后输出处理结果:

从hadoop框架与MapReduce模式中谈海量数据处理

第二部分、淘宝海量数据产品技术架构解读—学习海量数据处理经验

    在上面的本文的第一部分中,我们已经对mapreduce模式及hadoop框架有了一个深入而全面的了解。不过,如果一个东西,或者一个概念不放到实际应用中去,那么你对这个理念永远只是停留在理论之内,无法向实践迈进。

    Ok,接下来,本文的第二部分,咱们以淘宝的数据魔方技术架构为依托,通过介绍淘宝的海量数据产品技术架构,来进一步学习和了解海量数据处理的经验。

淘宝海量数据产品技术架构

    如下图2-1所示,即是淘宝的海量数据产品技术架构,咱们下面要针对这个架构来一一剖析与解读。

    相信,看过本博客内其它文章的细心读者,定会发现,图2-1最初见于本博客内的此篇文章:从几幅架构图中偷得半点海量数据处理经验之上,同时,此图2-1最初发表于《程序员》8月刊,作者:朋春。

    在此之前,有一点必须说明的是:本文下面的内容大都是参考自朋春先生的这篇文章:淘宝数据魔方技术架构解析所写,我个人所作的工作是对这篇文章的一种解读与关键技术和内容的抽取,以为读者更好的理解淘宝的海量数据产品技术架构。与此同时,还能展示我自己读此篇的思路与感悟,顺带学习,何乐而不为呢?。

    Ok,不过,与本博客内之前的那篇文章(几幅架构图中偷得半点海量数据处理经验)不同,本文接下来,要详细阐述这个架构。我也做了不少准备工作(如把这图2-1打印了下来,经常琢磨):

 从hadoop框架与MapReduce模式中谈海量数据处理

                                              图2-1 淘宝海量数据产品技术架构

    好的,如上图所示,我们可以看到,淘宝的海量数据产品技术架构,分为以下五个层次,从上至下来看,它们分别是:数据源,计算层,存储层,查询层和产品层。我们来一一了解这五层:

  1. 数据来源层。存放着淘宝各店的交易数据。在数据源层产生的数据,通过DataX,DbSync和Timetunel准实时的传输到下面第2点所述的“云梯”。
  2. 计算层。在这个计算层内,淘宝采用的是hadoop集群,这个集群,我们暂且称之为云梯,是计算层的主要组成部分。在云梯上,系统每天会对数据产品进行不同的mapreduce计算。
  3. 存储层。在这一层,淘宝采用了两个东西,一个使MyFox,一个是Prom。MyFox是基于MySQL的分布式关系型数据库的集群,Prom是基于hadoop Hbase技术 的(读者可别忘了,在上文第一部分中,咱们介绍到了这个hadoop的组成部分之一,Hbase—在hadoop之内的一个分布式的开源数据库)的一个NoSQL的存储集群。
  4. 查询层。在这一层中,有一个叫做glider的东西,这个glider是以HTTP协议对外提供restful方式的接口。数据产品通过一个唯一的URL来获取到它想要的数据。同时,数据查询即是通过MyFox来查询的。下文将具体介绍MyFox的数据查询过程。
  5.  产品层。简单理解,不作过多介绍。

    接下来,咱们重点来了解第三层-存储层中的MyFox与Prom,然后会稍带分析下glide的技术架构,最后,再了解下缓存。文章即宣告结束。

    我们知道,关系型数据库在我们现在的工业生产中有着广泛的引用,它包括Oracle,MySQL、DB2、Sybase和SQL Server等等。

MyFOX

    淘宝选择了MySQL的MyISAM引擎作为底层的数据存储引擎。且为了应对海量数据,他们设计了分布式MySQL集群的查询代理层-MyFOX。

如下图所示,是MySQL的数据查询过程:

从hadoop框架与MapReduce模式中谈海量数据处理

                                                            图2-2 MyFOX的数据查询过程

    在MyFOX的每一个节点中,存放着热节点和冷节点两种节点数据。顾名思义,热节点存放着最新的,被访问频率较高的数据;冷节点,存放着相对而来比较旧的,访问频率比较低的数据。而为了存储这两种节点数据,出于硬件条件和存储成本的考虑,你当然会考虑选择两种不同的硬盘,来存储这两种访问频率不同的节点数据。如下图所示:

从hadoop框架与MapReduce模式中谈海量数据处理

                                                           图2-3 MyFOX节点结构

     “热节点”,选择每分钟15000转的SAS硬盘,按照一个节点两台机器来计算,单位数据的存储成本约为4.5W/TB。相对应地,“冷数据”我们选择了每分钟7500转的SATA硬盘,单碟上能够存放更多的数据,存储成本约为1.6W/TB。

Prom

出于文章篇幅的考虑,本文接下来不再过多阐述这个Prom了。如下面两幅图所示,他们分别表示的是Prom的存储结构以及Prom查询过程:

从hadoop框架与MapReduce模式中谈海量数据处理

                                              图2-4 Prom的存储结构

 从hadoop框架与MapReduce模式中谈海量数据处理

                                                          图2-5 Prom查询过程

glide的技术架构

    从hadoop框架与MapReduce模式中谈海量数据处理

                                               图2-6 glider的技术架构

    在这一层-查询层中,淘宝主要是基于用中间层隔离前后端的理念而考虑。Glider这个中间层负责各个异构表之间的数据JOIN和UNION等计算,并且负责隔离前端产品和后端存储,提供统一的数据查询服务。

缓存

    除了起到隔离前后端以及异构“表”之间的数据整合的作用之外,glider的另外一个不容忽视的作用便是缓存管理。我们有一点须了解,在特定的时间段内,我们认为数据产品中的数据是只读的,这是利用缓存来提高性能的理论基础。

在上文图2-6中我们看到,glider中存在两层缓存,分别是基于各个异构“表”(datasource)的二级缓存和整合之后基于独立请求的一级缓存。除此之外,各个异构“表”内部可能还存在自己的缓存机制。

从hadoop框架与MapReduce模式中谈海量数据处理

                                                           图2-7 缓存控制体系

    图2-7向我们展示了数据魔方在缓存控制方面的设计思路。用户的请求中一定是带了缓存控制的“命令”的,这包括URL中的query string,和HTTP头中的“If-None-Match”信息。并且,这个缓存控制“命令”一定会经过层层传递,最终传递到底层存储的异构“表”模块。

    缓存系统往往有两个问题需要面对和考虑:缓存穿透与失效时的雪崩效应。

  1. 缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。至于如何有效地解决缓存穿透问题,最常见的则是采用布隆过滤器(这个东西,在我的此篇文章中有介绍:),将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。

    而在数据魔方里,淘宝采用了一个更为简单粗暴的方法,如果一个查询返回的数据为空(不管是数据不存在,还是系统故障),我们仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟。

      2、缓存失效时的雪崩效应尽管对底层系统的冲击非常可怕。但遗憾的是,这个问题目前并没有很完美的解决方案。大多数系统设计者考虑用加锁或者队列的方式保证缓存的单线程(进程)写,从而避免失效时大量的并发请求落到底层存储系统上。

    在数据魔方中,淘宝设计的缓存过期机制理论上能够将各个客户端的数据失效时间均匀地分布在时间轴上,一定程度上能够避免缓存同时失效带来的雪崩效应。

本文参考:

  1. 基于云计算的海量数据存储模型,侯建等。
  2. 基于hadoop的海量日志数据处理,王小森
  3. 基于hadoop的大规模数据处理系统,王丽兵。
  4. 淘宝数据魔方技术架构解析,朋春。
  5. Hadoop作业调优参数整理及原理,guili。

读者点评@xdylxdyl:

  1. We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That's map. The more people we get, the faster it goes. Now we get together and add our individual counts. That's reduce。
  2. 数据魔方里的缓存穿透,架构,空数据缓存这些和Hadoop一点关系都么有,如果是想讲一个Hadoop的具体应用的话,数据魔方这部分其实没讲清楚的。
  3. 感觉你是把两个东西混在一起了。不过这两个都是挺有价值的东西,或者说数据魔方的架构比Hadoop可能更重要一些,基本上大的互联网公司都会选择这么做。Null对象的缓存保留五分钟未必会有好的结果吧,如果Null对象不是特别大,数据的更新和插入不多也可以考虑实时维护。
  4. Hadoop本身很笨重,不知道在数据魔方里是否是在扮演着实时数据处理的角色?还是只是在做线下的数据分析的?

结语:写文章是一种学习的过程。尊重他人劳动成果,转载请注明出处。谢谢。July、2011/8/20。完。


本节书摘来异步社区《Hadoop MapReduce性能优化》一书中的第1章,第1.4节,作者: 【法】Khaled Tannir 译者: 范欢动 责编: 杨海玲,更多章节内容可以访问云栖社区“异步社区”公众号查看。

1.4 影响MapReduce性能的因素

Hadoop MapReduce性能优化
影响MapReduce输入数据处理时间的因素很多。其中之一是实现map和reduce函数时使用的算法。其他外部因素也可能影响MapReduce性能。根据我们的经验和观察,可能影响MapReduce的主要因素有以下几个。

  • 硬件(或者资源)因素,如CPU时钟、磁盘I/O、网络带宽和内存大小。
  • 底层存储系统。
  • 输入数据、分拣(shuffle)数据以及输出数据的大小,这与作业的运行时间紧密相关。
  • 作业算法(或者程序),如map、reduce、partition、combine和compress。有些算法很难在MapReduce中概念化,或者在MapReduce中效率可能会降低。

运行map任务时,shuffle子任务的中间输出存储在内存缓冲区中,用以减少磁盘I/O。输出的大小可能会超过内存缓冲区而造成溢出,因此需要spill子阶段把数据刷新到本地文件系统。这个子阶段也会影响MapReduce性能,它经常采用多线程技术实现,以便使磁盘I/O利用率最大化并缩减作业的运行时间。

MapReduce编程模型允许用户使用自己的map和reduce函数指定数据转换逻辑。本模型并不限定map函数产生的中间对在交由reduce函数处理前如何被分组。因此,归并排序(merge-sort)算法被用作默认的分类算法。然而,归并排序算法并非总是最高效的,尤其是对分析型任务(如聚合和等值连接)而言,这类任务并不关心中间键的顺序。

提示.tif对于MapReduce编程模型来说,分组(grouping)/划分(partitioning)是一个串行的任务。这就意味着在reduce任务可以运行之前,框架需要等待所有map任务完成。

想要深入学习归并排序算法,请参考http://en.wikipedia.org/wiki/Merge_sort
MapReduce性能是以map和reduce的运行时间为基础的。这是因为典型环境下集群节点数目和节点插槽数目这类参数是不可修改的。

其他可能对MapReduce性能构成潜在影响的因素具体如下。

  • I/O模式:也就是从存储系统获取数据的方式。从底层存储系统读取数据有以下两种模式。
    直接I/O:通过硬件控制器把数据从本地硬盘缓存中直接读到内存,因而不需要进程间通信成本。

流式I/O:通过特定进程间通信手段,如TCP/IP和JDBC,从其他正在运行进程(典型情况是存储系统进程)读取数据。

从提高性能的角度看,使用直接I/O比流式I/O更高效。

  • 输入数据解析:是指从存储系统获取数据时,从原始数据到键值对的转换过程。数据解析过程的目标是把原始数据按照原来的格式解码,并转换为可供Java等编程语言处理的数据对象。

输入数据可以解码为(Java或者其他语言)对象,这样当对象实例创建后,对象内容可以改变,典型的情况是使用对对象实例的引用(这样的对象叫做可变对象),输入数据也可以解码为一经创建其内容就不可改变的对象(叫做不可变对象)。在上百万条记录的情况下,不可变对象的解码过程会明显比可变对象的解码过程慢,这是因为在前者解码过程中产生了大量不可变对象。因此,这会导致系统性能的降低。 - 输入数据存储:当MapReduce获取数据并进行进一步处理时,所在的存储系统必须保证高速访问和数据可用性(如HDFS和HBase)。如果选用的不是那些推荐的与MapReduce一起使用的存储文件系统,那么输入数据的访问会潜在地影响MapReduce性能。

使用Hadoop框架时,许多因素可能会影响整个系统的性能和作业的运行时间。这些因素可能是Hadoop MapReduce引擎的一部分,也可能是引擎之外的。

Hadoop配置参数通常会影响并发运行的任务数,并决定作业的运行时间,因为Hadoop集群被建立且作业开始执行后,其他因素就不可改变了。如果Hadoop框架配置不当,可能无法充分利用集群资源,并因此影响MapReduce作业性能。这是因为大量的配置参数控制着Hadoop框架的行为。

一项Hadoop作业经常由许多实现不同算法的子模块组成,这些子模块要么以串行方式连接,要么以并行方式连接。如果Hadoop框架配置不当,可能会影响内部任务完成的协作方式。所有这类参数(将在第2章讨论)设置的影响都依赖于map和reduce函数的代码、集群资源,当然还有输入数据。

MapReduce作业的性能也可能受Hadoop集群节点数的影响,以及受所有节点中运行map和reduce任务的可用资源的影响。每个节点的容量决定了一个节点可以执行的mapper和recducer任务的数量。因此,如果节点资源利用不充分或者过度利用,都会直接影响MapReduce任务的性能。