Mapper和Reduce阶段流程

一、MR的编写
1. Mapper
        MapTask中负责Map阶段核心运算逻辑的类!
        ①继承Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
        ②KEYIN,VALUEIN 取决于InputFormat中RecordReader的设置
            KEYOUT,VALUEOUT由自己定义
        ③Mapper的运行流程
            由MapTask调用Mapper.run()
            
            run(){
                setUp();
                while(context.nextKeyValue()) //循环调用RR读取下一组输入的key-value
                {
                    map(key,value,context);
                }
                cleanUp();
            }
        ④在Mapper的map()中编写核心处理逻辑
                
2. Reducer
        ReduceTask中负责Reduce阶段核心运算逻辑的类!
        ①继承Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
        ②KEYIN,VALUEIN 取决于Mapper的输出
            KEYOUT,VALUEOUT由自己定义
        ③Reducer的运行流程
            由MapTask调用Reducer.run()
            
            run(){
                setUp();
                while(context.nextKey()) // 使用迭代器不断迭代key相同的数据
                {
                    reduce(key,value,context);
                }
                cleanUp();
            }
        ④在Reducer的reduce()中编写核心处理逻辑

3. Job
        ①创建Job
            Job job=Job.getInstance(Configuration conf);
            
        ②设置Job
            Job.setName("job名");  //设置名称
            Job.setJar("jar名"); |  Job.setJarByClass(类名)  // 设置Job所在的Jar包,只有在YARN上运行时才需要设置
        ③配置Job
                设置输入和输出格式,如果不设置使用默认
                设置Mapper,Reducer
                设置Mapper和Reducer的输出类型 // 主要为了方便根据类型获取对应的序列化器
                设置输入和输出目录
        ④运行Job
                Job.waitforCompletion(true);
                
二、Read阶段的流程
根据InputFormat
①切片, getSplit()
②使用输入格式的RR读取数据, createRecordReader()

1.默认的TextInputFormat
        场景:  普通的文本格式数据来源
        切片:  采用默认的切片策略,以文件为单位,先判断文件是否可切,如果可切,循环以片大小为单位切片!
                不可切,整个文件作为1片!
                
        RR :  LineRecordReader(将一行封装为一个key-value)
                LongWritable    key: 行的偏移量
                Text  value:  行的内容
                
2. NLineInputFormat
        场景: 适合一行的内容特别多,在Map阶段map()处理的逻辑非常复杂!
                根据行数自定义切片的大小!
                    
        切片:    可以设置以文件为单位,每N行作为一个切片!
        
        RR :  LineRecordReader(将一行封装为一个key-value)
                LongWritable    key: 行的偏移量
                Text  value:  行的内容
                
3. KeyValueTextInputFormat
        场景:  一行的内容的格式 为 key-value,方便地将key,value拆分封装
        
        切片:  采用默认的切片策略,以文件为单位,先判断文件是否可切,如果可切,循环以片大小为单位切片!
                不可切,整个文件作为1片!

        RR :  KeyValueRecordReader(将一行封装为一个key-value)
                Text    key:  行的分隔符之前的部分内容
                Text  value:   行的分隔符之后的部分内容
                
4. CombineTextInputFormat
        场景:  输入目录中小文件过多,可以将多个小文件设置到一个切片中!
        
        切片:  ①根据maxSize对每个文件进行逻辑切片,切分为若干part
                ②将多个part组合,知道超过maxSize,这些part作为一个切片
                
        
        RR :  LineRecordReader(将一行封装为一个key-value)
                LongWritable    key: 行的偏移量
                Text  value:  行的内容    
            
三、切片和块
切片:  对文件进行逻辑切分,只有在运行MR任务时,才会对文件切分!
        切分时,切片的大小不同,每个文件切分的结果也不同!
        
块:  文件在上传到HDFS时,在HDFS上存储的最小单位,物理存储!

关系: MapTask在读取切片的内容时,需要根据切片的metainfo,获取到当前切片属于文件的哪部分!
        再根据此信息去寻找对应的块,读取数据!
        
默认切片大小等于块大小,主要为了减少在运行MR时,大量的跨机器读取切片内容带来额外的网络IO!

根据默认的策略策略,可以调整切片的大小:
    调整切片大小 大于 块大小: 调整minSize
    调整切片大小 小于 块大小:  调整maxSize
    
四、Job提交流程
①提交之前的准备阶段
        a)检查输出目录是否合法
        b)为Job设置很多属性(用户,ip,主机名..)
        c)使用InputFormat对输入目录中的文件进行切片
                设置Job运行的mapTask的数量为切片的数量
        d)在Job的作业目录生成Job执行的关键文件
                job.split (job的切片对象)
                job.splitmetainfo(job切片的属性信息)
                job.xml(job所有的配置)
                
        e) 正式提交Job
        
②本地模式
        在提交Job后,创建LocalJobRunner.Job.Job对象,启动线程!
        在LocalJobRunner.Job.Job启动的线程中,使用线程池,用多线程的形式模拟MapTask和ReduceTask的多进程运行!
        执行Map,调用线程池,根据Map的切片信息,创建若干MapTaskRunable线程,在线程池上运行多个线程!
        
        MapTaskRunable------>MapTask--------->Mapper--------->Mapper.run()------->Mapper.map()
        
        Map阶段运行完后,执行Reduce,调用线程池,根据Job设置的ReduceTask的数量,
        创建若干ReduceTaskRunable线程,在线程池上运行多个线程!
        
        ReduceTaskRunable------->ReduceTask------>Reducer----->Reducer.run()------>Reducer.reduce()
        
③YARN上运行
        在提交Job后,创建MRAppMaster进程!
        
        由MRAppMaster,和RM申请,申请启动多个MapTask,多个ReduceTask
        
        Container------>MapTask--------->Mapper--------->Mapper.run()------->Mapper.map()
        Container------->ReduceTask------>Reducer----->Reducer.run()------>Reducer.reduce()
        
五、Job提交之后MR的核心阶段划分
        总的来说: Map-----------------Reduce----------------------
                   MapTask-------------ReduceTask------------------
                   map------sort-------copy------sort---------reduce
                   
        详细的划分:  Map---------- -----------shuffle-----------Reduce
                      MapTask----------------------------ReduceTask-----
                       map------sort-------copy------sort---------reduce
                       map--------shuffle(sort-------copy------sort)-----reduce
                   
        如果当前Job没有Reduce阶段,MapTask只有map,没有sort
        如果当前Job有Reduce阶段,可以将Map-Reduce再详细分为Map--Shuffle---Reduce阶段
        Shuffle的含义为洗牌,将Map阶段写出的数据,进行洗牌(将数据整理的有序,方便Reducer进行reduce)!
        Shuffle阶段横跨MapTask和RedcueTask,在MapTask端也有Shuffle,在RedcueTask也有Shuffle!
        具体Shuffle阶段指MapTask的map之后到RedcuceTask的reduce之前!

 

Mapper和Reduce阶段流程