第7章 MapReduce

第7章 MapReduce

7.1 概述

7.1.1 分布式并行编程

MapReduce相较于传统的并行计算框架有什么优势?
第7章 MapReduce


7.1.2 MapReduce 模型简介

•MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽 象到了两个函数:Map和Reduce

•编程容易,不需要掌握分布式并行编程细节,也可以很容易把自己的程序运行在分布式系统上,完成海量数据的计算 •MapReduce采用“分而治之”策略,一个存储在分布式文件系统中的 大规模数据集,会被切分成许多独立的分片(split),这些分片可以被 多个Map任务并行处理

•MapReduce设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,因为,移动数据需要大量的网络传输开销,即代码到处跑,而不是移动数据

•MapReduce框架采用了Master/Slave架构,包括一个Master和若干个Slave。 Master上运行JobTracker,Slave上运行TaskTracker

•Hadoop框架是用Java实现的,但是,MapReduce应用程序则不一定要用Java来写


7.1.3 Map和Reduce函数

第7章 MapReduce

Map函数将输入的元素转换成<key,value>形式的键值对(键没有唯一性)。

Reduce函数将输入的一系列具有相同键的键值对以某种方式组合起来,输出处理后的键值对。

比如,我们想编写一个MapReduce程序来统计一个文本文件中每个单词出现的次数,对于表7-1中的Map函数的输入<k1,v1>而言,其具体数据就是<某一行文本在文件中的偏移位置,改行文本的内容>。用户可以自己编写Map函数处理过程,把文件中的一行读取后解析出每个单词,生成一批中间结果<单词,出现次数>,然后把这些中间结果作为Reduce函数的输入,Reduce函数的具体处理过程也是由用户自己编写的,用户可以将相同单词的出现次数进行累加,得到每个单词出现的总次数。


7.2 MapReduce的体系结构

MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、 TaskTracker以及Task

第7章 MapReduce

1)Client

•用户编写的MapReduce程序通过Client提交到JobTracker端

•用户可通过Client提供的一些接口查看作业运行状态

2)JobTracker

•JobTracker负责资源监控和作业调度

•JobTracker 监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点

•JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器 (TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源

3)TaskTracker

•TaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、 杀死任务等)

•TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用

4)Task

•Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动


7.3 MapReduce工作流程

7.3.1 工作流程概述

第7章 MapReduce

•不同的Map任务之间不会进行通信

•不同的Reduce任务之间也不会发生任何信息交换

•用户不能显式地从一台机器向另一台机器发送消息

•所有的数据交换都是通过MapReduce框架自身去实现的

•只有Map需要考虑数据局部性,实现“计算向数据靠拢”,而Reduce则无需考虑数据局部性


7.3.2 MapReduce的各个执行阶段

第7章 MapReduce

(1)MapReduce框架使用InputFormat模块做Map前的预处理,比如验证输入的格式是否符合输入定义;然后,将输入文件切分为逻辑上的多个InputSplit,InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件进行实际切割,只是记录了要处理的数据的位置和长度。

(2)因为InputSplit是逻辑切分而非物理切分,所以还需通过RecordReader根据InputSplit中的信息来处理InputSplit中的具体记录,加载数据并转换为适合Map任务读取的键值对,输入给Map任务。

(3)Map任务会根据用户自定义的映射规则,输出一系列的<key,value>作为中间结果。

(4)为了让Reduce可以并行处理Map的结果,需要对Map的输出进行一定的分区(partition)、排序(sort)、合并(combine)、归并(merge)等操作,得到<key,value>形式的中间结果,再交给对应的Reduce进行处理,这个过程称为shuffle。从无序的<key,value>到有序的<key,value-list>,这个过程用Shuffle来称呼是非常形象的。

(5)Reduce以一系列<key,value-list>中间结果为输入,执行用户定义的逻辑,输出结果给OutputFormat模块。

(6)OutpFormat模块会验证输出目录是否已经存在以及输出结果类型是否符合配置文件中的配置类型,如果都满足,就输出Reduce的结果到分布式文件系统。


7.3.3 Shuffle 过程详解

1.shuffle过程简介

第7章 MapReduce

(1)在Map端的Shuffle

Map的输出结果首先被写入缓存,当缓存满时,就启动溢写操作,把缓存中的数据写入磁盘文件并清空缓存。当启动溢写操作时,首先需要把缓存中的数据进行分区,然后对每个分区的数据进行排序和合并,之后再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,随着Map任务的执行,磁盘中就会生成多个溢写文件。在Map任务全部结束之前,这些溢写文件会被归并成一个大的磁盘文件,然后通知相应的Reduce任务来领取属于自己处理的数据。

(2)在Reduce端的Shuffle过程

Reduce任务从Map端的不同Map机器领回属于自己处理的那部分数据,然后对数据进行归并后交给Reduce处理。

2.Map端的Shuffle过程

第7章 MapReduce

(1)输入数据和执行Map任务

(2)写入缓存

(3)溢写(分区、排序和合并)

​ 当缓存溢出时(不是全部占满,会留一些空间保证Map结果能不断写入缓存,比如当100MB大小的缓存被填满80MB数据时,就启动溢写过程),启动溢写。

分区: MapReduce通过Partitioner接口对这些键值对进行分区,默认采用的分区方式是采用Hash函数对key进行哈希后再用Reduce任务的数量R进行取模,可以表示为hash(key)mod R。也可以通过重载Partitioner接口来自定义分区方式。

合并: 是指将那些具有相同key的<key,value>的value加起来。比如,有两个键值对<‘xmu’,1>和<‘xmu’,2>,经过合并操作以后可以得到一个键值对<‘xmu’,2>,减少了键值操作发生在Map端,所以我们只能称之为“合并”,从而有别于Reduce。

(4)文件归并

​ 所谓“归并”,是指对于具有相同key的键值对会被归并成一个新的键值对。具体而言,对于若干个具有相同key的键值对<k1,v1>,<k1,v2>,……,<k1,vn>

会被归并成一个新的键值对<k1,<v1,v2,……,vn>>。

​ 经过上述4个步骤之后,Map端的shuffle过程全部完成,最终生成的一个大文件会被存取在本地磁盘上。这个大文件中的数据是被区分的,不同的分区会被发送到不同的Reduce任务进行并行处理。JobTracker会一直监测Map任务的执行,当监测到一个Map任务完成后,就会立即通知相关的Reduce任务来“领取”数据,然后开始Reduce端的Shuffle过程。

3.Reduce端的Shuffle过程

第7章 MapReduce

(1)“领取”数据

​ Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。Map任务成功完成后,会通知父TaskTracker状态已经更新,TaskTracker进而通知JobTracker(这些通知在心跳机制中进行)。所以,对于指定作业来说,JobTracker能记录Map输出和TaskTracker的映射关系。Reduce会定期向JobTracker获取Map的输出位置,一旦拿到输出位置,Reduce任务就会从此输出对应的TaskTracker上复制输出到本地,而不会等到所有的Map任务结束。

(2)归并数据

(3)把数据输入给Reduce任务