MapReduce编程模型详解— —以经典Word Count为例

MapReduce:来自于Google的计算模式

MapReduce最早是由Google研究提出的一种面向大规模数据处理的并行计算模型和方法,其初衷主要是为了解决搜索引擎中大规模网页数据的并行化处理。由于MapReduce可以普遍应用于很多大规模数据的计算问题,Google进一步将其广泛应用于很多大规模数据处理问题。到目前为止,Google有上万个各种不同的算法问题和程序都使用MapReduce进行处理。
MapReduce的推出给大数据并行处理带来了巨大的革命性影响,使其已经成为事实上的大数据处理的工业标准。尽管MapReduce还有很多局限性,但普遍公认,MapReduce是到目前为止最为成功、最广为接受和最易于使用的大数据并行处理技术。
MapReduce的发展普及和带来的巨大影响远远超出了发明者和开源社区当初的意料,以至于马里兰大学教授、2010年出版的Data-Intensive Text Processing with MapReduce的作者Jimmy Lin在书中提出:

“MapReduce改变了我们组织大规模计算的方式,它代表了第一个有别于冯·诺依曼结构的计算模型,是在集群规模而非单个机器上组织大规模计算的新的抽象模型上的第一个重大突破,是到目前为止所见到的最为成功的基于大规模计算资源的计算模型”。

Google最初的MapReduce框架如下图所示,后来发展成了Hadoop的MapReduce框架。
MapReduce编程模型详解— —以经典Word Count为例
在Google论文中提出的编程模型如下图,包括map和reduce两个过程,Hadoop的MapReduce框架完整地继承了Google的思想,无论是MR的组件、阶段还是编程时的类结构都一律全盘接收。
MapReduce编程模型详解— —以经典Word Count为例

MapReduce示例程序WordCount过程

以Hadoop 2.7.3版本为例,MR所有示例在安装包的share/mapreduce目录下,名称为hadoop-mapreduce-examples-2.7.3.jar, 除了WordCount示例,其中还包括数据排序、二次排序、用户比赛的TeraSort和各种字符串处理程序,这些程序都是初学者学习MR的宝贵资料,但常常被忽略,实在可惜!从GitHub上可获取示例程序对应的源代码,细细研读体会,融会贯通,必有所得。
WordCount实现的示意图如下图所示,包括数据输入input、数据分片split、map、combine(相对于map端的reduce)、Shuffle&sort(混洗和排序,是由MR框架自动完成的)、reduce和输出ouput。
MapReduce编程模型详解— —以经典Word Count为例

WordCount源码解析

package com.hikdata.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * HADOOP自带WordCount示例
 * MR程序的基本机构包括一个Mapper的实现、一个Reducer的实现和main函数
 * map(映射操作)由Mapper类的map函数实现
 * reduce(规约操作)由Reducer类的reduce函数实现
 */
public class WordCount {
    /**
     * Mapper子类,继承Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * 通过查看Mapper类的定义,四个泛型参数分别代表
     * <br>KEYIN<br/>:输入键值对的键(key)类型,在WordCount中代表每一行数据相对起始位置的偏移量,
     * 虽然标记为Object类型,但在实际运行过程中是LongWritable类型(相当于Long),
     * 在以单行文本为处理单元的MR程序中,此类型必须要能被MR框架处理,如果不能正确转换(如标记为Date类型),则会抛出错误
     *
     * <br>VALUEIN<br/>:输入键值对的值(value)类型,map阶段输入的值,在WC程序中为一行行的文本
     *
     * Map阶段的输出要根据业务灵活选择,不能局限
     * <br>KEYOUT<br/>:输出键值对的键(key)类型,在WC程序中为每一个分割出的单词
     * <br>VALUEOUT<br/>:输出键值对的键(value)类型,在WC程序中为每一个分割出的单词对应的计数
     */
    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable>{
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        /**
         * 
         * @param key 输入的键,文本相对起始位置的偏移量
         * @param value 输入的值,每行文本
         * @param context 全局对象,负责联通Map和Reduce,可简单理解为全局环境变量
         * @throws IOException
         * @throws InterruptedException
         */
        public void map(Object key, Text value, Mapper.Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
    /**
     * Reducer子类,继承Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * 四个泛型参数分别代表
     * <br>KEYIN<br/>:输入键值对的键(key)类型,在WC中代表每个单词
     * <br>VALUEIN<br/>:输入键值对的值(value)类型,在WC程序中每个单词的计数
     *
     * <br>KEYOUT<br/>:最终输出键值对的键(key)类型,在WC程序中为每一个分割出的单词
     * <br>VALUEOUT<br/>:最终输出键值对的键(value)类型,在WC程序中为每一个分割出的单词所有计数的和
     */
    public static class IntSumReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        /**
         * 
         * @param key 每一个单词
         * @param values Map阶段每一个单词的计数汇总,格式为(1,1,1,...),
         *   其中的每个数字是由每一个Mapper统计出来的中间值,
         *   Reduce负责将这些值汇总统计,形成最后的输出
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        /**
         * 设置Combiner的作用是让每个Mapper汇总Map阶段的值,
         * 目的在于减少网络传输,提高IO效率
         */
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //输入路径
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        //输出路径
        FileOutputFormat.setOutputPath(job,
                new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

一点理解

WordCount的需求极其简单,但实现并不简单;
MapReduce编程思想极其简单,但MapReduce框架的实现又极其复杂;
从简单入门,逐渐深入,理解简单背后的复杂机制,知其然并知其所以然,才能学以致用、灵活使用。