MapReduce

MapReduce模型

MapReduce采用“分而治之”策略,一个大规模数据集进行分片,多个Map任务并行处理。实现“计算向数据靠拢”理念,而不比大量移动数据造成网络开销。
MapReduce采用Master/Slave架构,一个Master,若干Slave。Master运行JobTracker负责作业调度,Slave运行TaskTracker负责具体作业处理。
JobTracker

  1. 负责任务调度与资源监控。
  2. 监控Job和TaskTracker的健康状态,一旦失败,相应任务就要发生转移。
  3. 跟踪任务进度,汇报给调度器,调度器根据在资源空闲时,分配合适的任务。

TaskTracker

  1. 定期使用“心跳”向JobTracker报告任务进度,同时接受新任务。
  2. 使用“slot”等量划分资源,调度的基本单位,一个Task只有拥有一个“slot”才能执行,调度器就是把空闲的“slot”分配给Task,分为Map slot和Reduce slot。

Task
分为Map Task和Reduce Task,都由TaskTracker启动。
MapReduce

MapReduce执行过程

MapReduce
InputFormat对HDFS中的数据进行加载,进行split(逻辑分片,HDFS中的Block是物理分片),RR(RecordReader)将各个分片的数据从HDFS中读取出来以键值对输出作为Map函数(用户程序自己编写的逻辑)进行输入,输出中间结果进行Shufflc,传给Reduce函数输出最终结果。
Split
逻辑上进行分片,分片的依据用户可以自定义,但分片的数量决定了Map任务的数量,理想分片是HDFS的块。Reduce任务的数量通常是比集群中Reduce slot槽的总量略小一点。

Shufflc
分为Map端Shufflc和Reduce端Shufflc
MapReduce
Map端Shufflc
MapReduce
每个任务配一个缓存,溢写比例0.8

  1. 分区默认采用哈希函数
  2. 排序是默认操作
  3. 合并不能改变最终结果,不一定发生
  4. Map任务全部结束前对溢写的文件(大于预定值可以再次合并)进行归并,得到一个大的本地文件
  5. JobTracker会检测Map任务进度,通知Reduce任务来处理数据

Reduce端Shufflc
MapReduce
来自不同Map机器的数据先写入缓存,归并数据,对溢写文件进行归并,输入给Reduce任务。数据小的话不发生溢写直接给Reduce。

MapReduce编程

重写map和reduce任务,实现词频统计。eclipse中编写MapReduce程序

package org.apache.hadoop.examples;
 
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class WordCount {
    public WordCount() {
    }
 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        if(otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
 
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCount.TokenizerMapper.class);
        job.setCombinerClass(WordCount.IntSumReducer.class);
        job.setReducerClass(WordCount.IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
 
        for(int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
 
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
 
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
 
        public IntSumReducer() {
        }
 
        public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
 
            IntWritable val;
            for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                val = (IntWritable)i$.next();
            }
 
            this.result.set(sum);
            context.write(key, this.result);
        }
    }
 
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();
 
        public TokenizerMapper() {
        }
 
        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
 
            while(itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write(this.word, one);
            }
 
        }
    }
}