Mapper和Reduce阶段流程
一、MR的编写
1. Mapper
MapTask中负责Map阶段核心运算逻辑的类!
①继承Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
②KEYIN,VALUEIN 取决于InputFormat中RecordReader的设置
KEYOUT,VALUEOUT由自己定义
③Mapper的运行流程
由MapTask调用Mapper.run()
run(){
setUp();
while(context.nextKeyValue()) //循环调用RR读取下一组输入的key-value
{
map(key,value,context);
}
cleanUp();
}
④在Mapper的map()中编写核心处理逻辑
2. Reducer
ReduceTask中负责Reduce阶段核心运算逻辑的类!
①继承Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
②KEYIN,VALUEIN 取决于Mapper的输出
KEYOUT,VALUEOUT由自己定义
③Reducer的运行流程
由MapTask调用Reducer.run()
run(){
setUp();
while(context.nextKey()) // 使用迭代器不断迭代key相同的数据
{
reduce(key,value,context);
}
cleanUp();
}
④在Reducer的reduce()中编写核心处理逻辑
3. Job
①创建Job
Job job=Job.getInstance(Configuration conf);
②设置Job
Job.setName("job名"); //设置名称
Job.setJar("jar名"); | Job.setJarByClass(类名) // 设置Job所在的Jar包,只有在YARN上运行时才需要设置
③配置Job
设置输入和输出格式,如果不设置使用默认
设置Mapper,Reducer
设置Mapper和Reducer的输出类型 // 主要为了方便根据类型获取对应的序列化器
设置输入和输出目录
④运行Job
Job.waitforCompletion(true);
二、Read阶段的流程
根据InputFormat
①切片, getSplit()
②使用输入格式的RR读取数据, createRecordReader()
1.默认的TextInputFormat
场景: 普通的文本格式数据来源
切片: 采用默认的切片策略,以文件为单位,先判断文件是否可切,如果可切,循环以片大小为单位切片!
不可切,整个文件作为1片!
RR : LineRecordReader(将一行封装为一个key-value)
LongWritable key: 行的偏移量
Text value: 行的内容
2. NLineInputFormat
场景: 适合一行的内容特别多,在Map阶段map()处理的逻辑非常复杂!
根据行数自定义切片的大小!
切片: 可以设置以文件为单位,每N行作为一个切片!
RR : LineRecordReader(将一行封装为一个key-value)
LongWritable key: 行的偏移量
Text value: 行的内容
3. KeyValueTextInputFormat
场景: 一行的内容的格式 为 key-value,方便地将key,value拆分封装
切片: 采用默认的切片策略,以文件为单位,先判断文件是否可切,如果可切,循环以片大小为单位切片!
不可切,整个文件作为1片!
RR : KeyValueRecordReader(将一行封装为一个key-value)
Text key: 行的分隔符之前的部分内容
Text value: 行的分隔符之后的部分内容
4. CombineTextInputFormat
场景: 输入目录中小文件过多,可以将多个小文件设置到一个切片中!
切片: ①根据maxSize对每个文件进行逻辑切片,切分为若干part
②将多个part组合,知道超过maxSize,这些part作为一个切片
RR : LineRecordReader(将一行封装为一个key-value)
LongWritable key: 行的偏移量
Text value: 行的内容
三、切片和块
切片: 对文件进行逻辑切分,只有在运行MR任务时,才会对文件切分!
切分时,切片的大小不同,每个文件切分的结果也不同!
块: 文件在上传到HDFS时,在HDFS上存储的最小单位,物理存储!
关系: MapTask在读取切片的内容时,需要根据切片的metainfo,获取到当前切片属于文件的哪部分!
再根据此信息去寻找对应的块,读取数据!
默认切片大小等于块大小,主要为了减少在运行MR时,大量的跨机器读取切片内容带来额外的网络IO!
根据默认的策略策略,可以调整切片的大小:
调整切片大小 大于 块大小: 调整minSize
调整切片大小 小于 块大小: 调整maxSize
四、Job提交流程
①提交之前的准备阶段
a)检查输出目录是否合法
b)为Job设置很多属性(用户,ip,主机名..)
c)使用InputFormat对输入目录中的文件进行切片
设置Job运行的mapTask的数量为切片的数量
d)在Job的作业目录生成Job执行的关键文件
job.split (job的切片对象)
job.splitmetainfo(job切片的属性信息)
job.xml(job所有的配置)
e) 正式提交Job
②本地模式
在提交Job后,创建LocalJobRunner.Job.Job对象,启动线程!
在LocalJobRunner.Job.Job启动的线程中,使用线程池,用多线程的形式模拟MapTask和ReduceTask的多进程运行!
执行Map,调用线程池,根据Map的切片信息,创建若干MapTaskRunable线程,在线程池上运行多个线程!
MapTaskRunable------>MapTask--------->Mapper--------->Mapper.run()------->Mapper.map()
Map阶段运行完后,执行Reduce,调用线程池,根据Job设置的ReduceTask的数量,
创建若干ReduceTaskRunable线程,在线程池上运行多个线程!
ReduceTaskRunable------->ReduceTask------>Reducer----->Reducer.run()------>Reducer.reduce()
③YARN上运行
在提交Job后,创建MRAppMaster进程!
由MRAppMaster,和RM申请,申请启动多个MapTask,多个ReduceTask
Container------>MapTask--------->Mapper--------->Mapper.run()------->Mapper.map()
Container------->ReduceTask------>Reducer----->Reducer.run()------>Reducer.reduce()
五、Job提交之后MR的核心阶段划分
总的来说: Map-----------------Reduce----------------------
MapTask-------------ReduceTask------------------
map------sort-------copy------sort---------reduce
详细的划分: Map---------- -----------shuffle-----------Reduce
MapTask----------------------------ReduceTask-----
map------sort-------copy------sort---------reduce
map--------shuffle(sort-------copy------sort)-----reduce
如果当前Job没有Reduce阶段,MapTask只有map,没有sort
如果当前Job有Reduce阶段,可以将Map-Reduce再详细分为Map--Shuffle---Reduce阶段
Shuffle的含义为洗牌,将Map阶段写出的数据,进行洗牌(将数据整理的有序,方便Reducer进行reduce)!
Shuffle阶段横跨MapTask和RedcueTask,在MapTask端也有Shuffle,在RedcueTask也有Shuffle!
具体Shuffle阶段指MapTask的map之后到RedcuceTask的reduce之前!