hadoop系列三:mapreduce的使用(一)

一:说明

 

此为大数据系列的一些博文,有空的话会陆续更新,包含大数据的一些内容,如hadoop,spark,storm,机器学习等。

 当前使用的hadoop版本为2.6.4

 

上一篇:hadoop系列二:HDFS文件系统的命令及JAVA客户端API

 

在下面可以看到统计一本小说(斗破苍穹)哪些词语出现了最多。

本来mapreducer只想写一篇的,可是发现写一篇太长了,所以就进行了拆分。

所有的部分都提供代码下载

目录可以在右侧查看,点击目录跳转到相应的位置

 

hadoop系列三:mapreduce的使用(一)
一:说明
二:wordcount字数统计功能
2.1:准备文件
2.2:编写Mapper的代码
2.3编写Reduce的代码
2.4:编写main方法执行这个mapreduce
2.5:把代码放在hadoop中运行
三:自定义序列化的类
3.1:自定义一个序列化的输出bean
3.2:编写mapper
3.3:编写reducer
 3.4:编写main方法
3.5:在hadoop中运行
四:数据分区(按照不同类型输出到不同的位置)
4.1:分区规则的代码
4.2:设置分区代码
4.3:分区的完整代码
4.4:在hadoop运行分区代码
五:数据排序及对象的重用
5.1:编写排序代码
5.2:编写mapper(对象的复用)
5.3:编写reducer
5.4:编写启动类
5.5:完整的代码
5.6:在hadoop中执行排序
六:统计一本小说中出现的词汇(包含Combiner)
6.1:准备工作
6.2:配置maven打包包含分词的依赖
6.3:数据汇总(Combiner)
6.4:排序阶段
hadoop系列三:mapreduce的使用(一)

 

 

 

二:wordcount字数统计功能

相应的代码在:代码地址--点我跳转

2.1:准备文件

既然是要统计字数,那么肯定是要有相应的文档,我们先准备一些这样的文档,我们准备两个文档,分别叫text1.txt和text2.txt

text1.txt

 

hello zhangsan
lisi nihao
hai zhangsan
nihao lisi
x xiaoming

 

text2.txt

1
2
3
4
5
6
zhangsan a
lisi b
wangwu c
jiji 7
haha xiaoming
xiaoming is gril

我们生成这样两个文件,待会去统计每个单词分别出现了多少次

 

 

2.2:编写Mapper的代码

 

直接贴上代码,相应的解释在注释中

hadoop系列三:mapreduce的使用(一)
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 这部分的输入是由mapreduce自动读取进来的
 * 简单的统计单词出现次数<br>
 * KEYIN 默认情况下,是mapreduce所读取到的一行文本的起始偏移量,Long类型,在hadoop中有其自己的序列化类LongWriteable
 * VALUEIN 默认情况下,是mapreduce所读取到的一行文本的内容,hadoop中的序列化类型为Text
 * KEYOUT 是用户自定义逻辑处理完成后输出的KEY,在此处是单词,String
 * VALUEOUT 是用户自定义逻辑输出的value,这里是单词出现的次数,Long
 * @author Administrator
 *
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
            throws IOException, InterruptedException {
        //这是mapreduce读取到的一行字符串
        String line = value.toString();
        String[] words = line.split(" ");
        
        for (String word : words) {
            //将单词输出为key,次数输出为value,这行数据会输到reduce中
            context.write(new Text(word), new LongWritable(1));
        }
    }
}
hadoop系列三:mapreduce的使用(一)

 

 

2.3编写Reduce的代码

同样直接上代码

hadoop系列三:mapreduce的使用(一)
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 第一个Text: 是传入的单词名称,是Mapper中传入的
 * 第二个:LongWritable 是该单词出现了多少次,这个是mapreduce计算出来的,比如 hello出现了11次
 * 第三个Text: 是输出单词的名称 ,这里是要输出到文本中的内容
 * 第四个LongWritable: 是输出时显示出现了多少次,这里也是要输出到文本中的内容
 * @author Administrator
 *
 */
public class WordCountReduce extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,
            Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long count = 0;
        for (LongWritable num : values) {
            count += num.get();
        }
        context.write(key, new LongWritable(count));
    }
}
hadoop系列三:mapreduce的使用(一)

 

 

 

2.4:编写main方法执行这个mapreduce

写了mapper与reduce的代码,自然是需要一个main方法来把这些代码运行起来的,所以编写如下代码

 

hadoop系列三:mapreduce的使用(一)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * 相当于运行在yarn中的客户端
 * @author Administrator
 *
 */
public class WordCountDriver {

    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        //如果是打包在linux上运行,则不需要写这两行代码
/*        //指定运行在yarn中
        conf.set("mapreduce.framework.name", "yarn");
        //指定resourcemanager的主机名
        conf.set("yarn.resourcemanager.hostname", "server1");*/
        Job job = Job.getInstance(conf);
        
        //使得hadoop可以根据类包,找到jar包在哪里
        job.setJarByClass(WordCountDriver.class);
        
        //指定Mapper的类
        job.setMapperClass(WordCountMapper.class);
        //指定reduce的类
        job.setReducerClass(WordCountReduce.class);
        
        //设置Mapper输出的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        //设置最终输出的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        
        //指定输入文件的位置,这里为了灵活,接收外部参数
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定输入文件的位置,这里接收启动参数
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //将job中的参数,提交到yarn中运行
        //job.submit();
        try {
            job.waitForCompletion(true);
            //这里的为true,会打印执行结果
        } catch (ClassNotFoundException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}
hadoop系列三:mapreduce的使用(一)

 

 

 

2.5:把代码放在hadoop中运行

代码写完了,要怎么运行呢?

(1)首先,肯定不是直接执行main方法运行,因为目前的代码,并不知道hadoop部署在哪里,我们要做的是,把这个项目打包,如果是maven项目,则使用maven package命令打包,把相应的jar包,上传到服务器中。

(2)其次,需要把之前的两个文本文件,text1.txt和text2.txt上传到hdfs中,因为既然是大数据,那么在实际环境中,肯定不可能是这么小的数据来进行计算,肯定是有着大量的数据,而这些数据,靠一台服务器肯定是放不下去的,也只有像hdfs这种大文件存储,或者一些其它的专门存放大数据的地方,才能存放了,我们使用如下的命令,把文件上传到hdfs中,如果这些命令看不懂,可以先看上一章节,hdfs的使用。

 

//创建一个目录
hadoop fs -mkdir -p /wordcount/input
//上传文件
hadoop fs -put text1.txt text2.txt /wordcount/input

 

(3)运行代码,带有main方法的代码,是可以使用java命令运行的,但是因为hadoop依赖了很多别的jar包,这样子运行代码,需要添加大量的依赖,写的命令很复杂,hadoop提供了这样的一个命令来执行代码

hadoop jar wordcount.jar com.zxj.hadoop.demo.mapreduce.wordcount.WordCountDriver /wordcount/input /wordcount/output

 这里来解释一下这条命令的意思,jar说明使用hadoop中内置的jar命令,也就是执行一个jar包。wordcount.jar 这个是上传的代码,也就是我们之前写的代码,打包之后上传到服务器中的名字。com.zxj.hadoop.demo.mapreduce.wordcount.WordCountDriver是需要运行哪个类,因为一个jar包中有可能有多个main方法,这样可以指定使用哪个类启动。最后两个参数 /wordcount/input 和 /wordcount/output,这是我们的代码中自定义的两个参数,第一个是文件的目录(意味着可以读取一整个目录中的多个文件),第二个是输出结果的目录。

执行完成之后,会有如下结果,如果没有抛出异常,或者写明失败,带有success的就是成功了。

hadoop系列三:mapreduce的使用(一)

现在我们可以去看一下输出结果

查看输出的文件

 

hadoop fs -ls /wordcount/output

 

hadoop系列三:mapreduce的使用(一)

第一个文件代表执行成功,第二个文件是输出结果文件,执行如下命令查看

hadoop系列三:mapreduce的使用(一)

从上图发现,zhangsan出现了3次,xiaoming出现了3次,nihao出现了2次,其它的是1次

 

 

 

 

 

 

三:自定义序列化的类

代码地址:下载代码

当输出的结果比较复杂的时候,就没办法使用Text,LongWritable这种类型来输出,这个时候我们可以自定义一个序列化的类,这个序列化不是jdk的序列化,而是hadoop自已的序列化,我们需要实现它

如下文档,保存并命名为staff.txt:

 

hadoop系列三:mapreduce的使用(一)
张三    江西    打车    200
李四    广东    住宿    600
王五    北京    伙食    320
张三    江西    话费    50
张三    湖南    打车    900
周六    上海    采购    3000
李四    *    旅游    1000
王五    北京    借款    500
李四    上海    话费    50
周六    北京    打车    600
张三    广东    租房    3050
hadoop系列三:mapreduce的使用(一)

 

 

3.1:自定义一个序列化的输出bean

之前我们一直使用LongWriteable或者Text来作为输入的内容,但是如果看这两个对象的源码,它们都是实现了Writable接口的,这是一个hadoop自带的序列化接口。

现在我们要输出一些信息,单单靠一个Text已经无法达到我们的效果的时候,我们就可以自定义一个对象,然后实现Writable接口

如下的代码,就是自定义一个可序列化的bean

 

hadoop系列三:mapreduce的使用(一)
    /**
     * 封装的bean
     */
    public static class SpendBean implements Writable{

        private Text userName;

        private IntWritable money;

        public SpendBean(Text userName, IntWritable money) {
            this.userName = userName;
            this.money = money;
        }

        /**
         * 反序列化时必须有一个空参的构造方法
         */
        public SpendBean(){}

        /**
         * 序列化的代码
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            userName.write(out);
            money.write(out);
        }

        /**
         * 反序列化的代码
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            userName = new Text();
            userName.readFields(in);
            money = new IntWritable();
            money.readFields(in);
        }

        public Text getUserName() {
            return userName;
        }

        public void setUserName(Text userName) {
            this.userName = userName;
        }

        public IntWritable getMoney() {
            return money;
        }

        public void setMoney(IntWritable money) {
            this.money = money;
        }

        @Override
        public String toString() {
            return userName.toString() + "," + money.get();
        }
    }
hadoop系列三:mapreduce的使用(一)

 

 

 

 

3.2:编写mapper

 编写mapper

 

hadoop系列三:mapreduce的使用(一)
    /**
     * Mapper
     */
    public static class GroupUserMapper extends Mapper<LongWritable,Text,Text,SpendBean>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String val = value.toString();
            String[] split = val.split("\t");
            //这里就不作字符串异常的处理了,核心代码简单点
            String name = split[0];
            String province = split[1];
            String type = split[2];
            int money = Integer.parseInt(split[3]);
            SpendBean groupUser = new SpendBean();
            groupUser.setUserName(new Text(name));
            groupUser.setMoney(new IntWritable(money));
            context.write(new Text(name),groupUser);
        }
    }
hadoop系列三:mapreduce的使用(一)

 

 

3.3:编写reducer

编写reducer

 

hadoop系列三:mapreduce的使用(一)
/**
     * reducer
     */
    public static class GroupUserReducer extends Reducer<Text,SpendBean,Text,SpendBean> {
        /**
         * 姓名
         * @param key
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<SpendBean> values, Context context) throws IOException, InterruptedException {
            int money = 0;//消费金额
            //遍历
            for(SpendBean bean : values){
                money += bean.getMoney().get();
            }
            //输出汇总结果
            context.write(key,new SpendBean(key,new IntWritable(money)));
        }
    }
hadoop系列三:mapreduce的使用(一)

 

 

 3.4:编写main方法

编写main方法

hadoop系列三:mapreduce的使用(一)
    /**
     * 编写启动类
     * @param args
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(GroupUser.class); //设置jar中的启动类,可以根据这个类找到相应的jar包

        job.setMapperClass(GroupUserMapper.class); //设置mapper的类
        job.setReducerClass(GroupUserRecuder.class); //设置reducer的类

        job.setMapOutputKeyClass(Text.class); //mapper输出的key
        job.setMapOutputValueClass(SpendBean.class); //mapper输出的value

        job.setOutputKeyClass(Text.class); //最终输出的数据类型
        job.setOutputValueClass(SpendBean.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));//输入的文件位置
        FileOutputFormat.setOutputPath(job,new Path(args[1]));//输出的文件位置

        boolean b = job.waitForCompletion(true);//等待完成,true,打印进度条及内容
        if(b){
            //success
        }

    }
hadoop系列三:mapreduce的使用(一)

 

完整的代码如下,这里把几个类都写在一起了。

 

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.staffspend.groupuser;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @Author 朱小杰
 * 时间 2017-07-23 .16:33
 * 说明 ...
 */
public class GroupUser {
    /**
     * Mapper
     */
    public static class GroupUserMapper extends Mapper<LongWritable,Text,Text,SpendBean>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String val = value.toString();
            String[] split = val.split("\t");
            //这里就不作字符串异常的处理了,核心代码简单点
            String name = split[0];
            String province = split[1];
            String type = split[2];
            int money = Integer.parseInt(split[3]);
            SpendBean groupUser = new SpendBean();
            groupUser.setUserName(new Text(name));
            groupUser.setMoney(new IntWritable(money));
            context.write(new Text(name),groupUser);
        }
    }

    /**
     * reducer
     */
    public static class GroupUserReducer extends Reducer<Text,SpendBean,Text,SpendBean> {
        /**
         * 姓名
         * @param key
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<SpendBean> values, Context context) throws IOException, InterruptedException {
            int money = 0;//消费金额
            //遍历
            for(SpendBean bean : values){
                money += bean.getMoney().get();
            }
            //输出汇总结果
            context.write(key,new SpendBean(key,new IntWritable(money)));
        }
    }

    /**
     * 封装的bean
     */
    public static class SpendBean implements Writable{

        private Text userName;

        private IntWritable money;

        public SpendBean(Text userName, IntWritable money) {
            this.userName = userName;
            this.money = money;
        }

        /**
         * 反序列化时必须有一个空参的构造方法
         */
        public SpendBean(){}

        /**
         * 序列化的代码
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            userName.write(out);
            money.write(out);
        }

        /**
         * 反序列化的代码
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            userName = new Text();
            userName.readFields(in);
            money = new IntWritable();
            money.readFields(in);
        }

        public Text getUserName() {
            return userName;
        }

        public void setUserName(Text userName) {
            this.userName = userName;
        }

        public IntWritable getMoney() {
            return money;
        }

        public void setMoney(IntWritable money) {
            this.money = money;
        }

        @Override
        public String toString() {
            return userName.toString() + "," + money.get();
        }
    }


    /**
     * 编写启动类
     * @param args
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(GroupUser.class); //设置jar中的启动类,可以根据这个类找到相应的jar包

        job.setMapperClass(GroupUserMapper.class); //设置mapper的类
        job.setReducerClass(GroupUserReducer.class); //设置reducer的类

        job.setMapOutputKeyClass(Text.class); //mapper输出的key
        job.setMapOutputValueClass(SpendBean.class); //mapper输出的value

        job.setOutputKeyClass(Text.class); //最终输出的数据类型
        job.setOutputValueClass(SpendBean.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));//输入的文件位置
        FileOutputFormat.setOutputPath(job,new Path(args[1]));//输出的文件位置

        boolean b = job.waitForCompletion(true);//等待完成,true,打印进度条及内容
        if(b){
            //success
        }

    }
}
hadoop系列三:mapreduce的使用(一)

 

 

3.5:在hadoop中运行

然后执行maven clean package命令,重新打包,并且上传到服务器中。

我们也创建一个目录,来存放之前的员工消费信息

hadoop fs -mkdir -p /staffspend/input

把之前准备好的员工文件上传到这个目录

hadoop fs -put staff.txt /staffspend/input

然后准备执行任务

 

hadoop jar hadoop-mapreduce-1.0.jar com.zxj.hadoop.demo.mapreduce.staffspend.groupuser.GroupUser /staffspend/input /staffspend/output

执行成功后,查看输出文件

hadoop fs -cat /staffspend/output/part-r-00000

hadoop系列三:mapreduce的使用(一)

 

 

 

 

 

四:数据分区(按照不同类型输出到不同的位置)

下载代码:点我下载

这样的需求也经常会有,我可能并不是仅仅需要总的数据查看,我还可能要查看每一个类型,比如第三部分的文件中,我可能想分别查看每个省中,每个人分别用了多少钱。

这个时候我们对上第三部分的代码进行修改

我们要增加输出bean中的省份字段,红色位置是修改过的部分

hadoop系列三:mapreduce的使用(一)
/**
     * 封装的bean
     */
    public static class SpendBean implements Writable{

        private Text userName;

        private IntWritable money;

        private Text province;


        public SpendBean(Text userName, IntWritable money, Text province) {
            this.userName = userName;
            this.money = money;
            this.province = province;
        }

        /**
         * 反序列化时必须有一个空参的构造方法
         */
        public SpendBean(){}

        /**
         * 序列化的代码
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            userName.write(out);
            money.write(out);
            province.write(out);
        }

        /**
         * 反序列化的代码
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            userName = new Text();
            userName.readFields(in);
            money = new IntWritable();
            money.readFields(in);
            province = new Text();
            province.readFields(in);
        }

        public Text getUserName() {
            return userName;
        }

        public Text getProvince() {
            return province;
        }

        public void setProvince(Text province) {
            this.province = province;
        }

        public void setUserName(Text userName) {
            this.userName = userName;
        }

        public IntWritable getMoney() {
            return money;
        }

        public void setMoney(IntWritable money) {
            this.money = money;
        }

        @Override
        public String toString() {
            return "SpendBean{" +
                    "userName=" + userName +
                    ", money=" + money +
                    ", province=" + province +
                    '}';
        }
    }
hadoop系列三:mapreduce的使用(一)

可以看到,上面的bean并没有改动什么特别的东西,完全是加了一个省份字段而已。

 

 

4.1:分区规则的代码

首先,如果要按照数据进行分区,我们肯定需要写分区的代码来告诉hadoop,我们写一个分区的类来继承org.apache.hadoop.mapreduce.Partitioner

hadoop中的分区,是在mapper结束后的reducer中,所以下面的代码是在reducer时运行的,我们对不同的省份进行规则划分,比如说江西就是对应的0分区

具体代码如下:

 

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.staffspend.groupuser;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author 朱小杰
 * 时间 2017-07-29 .11:14
 * 说明
 * key ,value是mapper中输出的类型,因为分区是在mapper完成之后进行的
 */
public class ProvincePartitioner extends Partitioner<Text, GroupUser.SpendBean> {
    private static Map<String,Integer> provinces = new HashMap<>();
    static {
        //这里给每一个省份编制一个分区
        provinces.put("江西",0);
        provinces.put("广东",1);
        provinces.put("北京",2);
        provinces.put("湖南",3);
        provinces.put("上海",4);
        provinces.put("*",5);
    }

    /**
     * 给指定的数据一个分区
     * @param text
     * @param spendBean
     * @param numPartitions
     * @return
     */
    @Override
    public int getPartition(Text text, GroupUser.SpendBean spendBean, int numPartitions) {
        Integer province = provinces.get(spendBean.getProvince().toString());
        province = province == null ? 6 : province;  //如果在省份列表中找不到,则指定一个默认的分区
        return province;
    }
}
hadoop系列三:mapreduce的使用(一)

 

很简单的代码,我们划分了6个分区,如果有的省份在这6个分区中找不到,那余下的就会进入第7个分区中。

 

 

4.2:设置分区代码

分区的代码既然写完了,那么就需要在运行的时候,指定这分区的规则是我们刚才写的代码,位置在运行的main方法中,如下:

红色部分是重点部分,也是改过的部分

 

hadoop系列三:mapreduce的使用(一)
/**
     * 编写启动类
     * @param args
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(GroupUser.class); //设置jar中的启动类,可以根据这个类找到相应的jar包

        job.setMapperClass(GroupUserMapper.class); //设置mapper的类
        job.setReducerClass(GroupUserReducer.class); //设置reducer的类

        job.setPartitionerClass(ProvincePartitioner.class);//指定数据分区规则,不是必须要的,根据业务需求分区
        job.setNumReduceTasks(7); //设置相应的reducer数量,这个数量要与分区的大最数量一致

        job.setMapOutputKeyClass(Text.class); //mapper输出的key
        job.setMapOutputValueClass(SpendBean.class); //mapper输出的value

        job.setOutputKeyClass(Text.class); //最终输出的数据类型
        job.setOutputValueClass(SpendBean.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));//输入的文件位置
        FileOutputFormat.setOutputPath(job,new Path(args[1]));//输出的文件位置

        boolean b = job.waitForCompletion(true);//等待完成,true,打印进度条及内容
        if(b){
            //success
        }

    }
hadoop系列三:mapreduce的使用(一)

 这里再说明一下

job.setNumReduceTasks(7);如果

 如果这个数值是1,那么所有的数据全部会输出到一个文件中。

假如是2,那么将会报错。

假如超出分区大小,比如写一个10,那么多出来的文件将会为空。所以一般是按最大需要分区数量写。

 

 

4.3:分区的完整代码

下面贴出完整的代码

分区代码:

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.staffspend.groupuser;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author 朱小杰
 * 时间 2017-07-29 .11:14
 * 说明
 * key ,value是mapper中输出的类型,因为分区是在mapper完成之后进行的
 */
public class ProvincePartitioner extends Partitioner<Text, GroupUser.SpendBean> {
    private static Map<String,Integer> provinces = new HashMap<>();
    static {
        //这里给每一个省份编制一个分区
        provinces.put("江西",0);
        provinces.put("广东",1);
        provinces.put("北京",2);
        provinces.put("湖南",3);
        provinces.put("上海",4);
        provinces.put("*",5);
    }

    /**
     * 给指定的数据一个分区
     * @param text
     * @param spendBean
     * @param numPartitions
     * @return
     */
    @Override
    public int getPartition(Text text, GroupUser.SpendBean spendBean, int numPartitions) {
        Integer province = provinces.get(spendBean.getProvince().toString());
        province = province == null ? 6 : province;  //如果在省份列表中找不到,则指定一个默认的分区
        return province;
    }
}
hadoop系列三:mapreduce的使用(一)

其它代码,这些代码是写在一个文件中了

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.staffspend.groupuser;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @Author 朱小杰
 * 时间 2017-07-23 .16:33
 * 说明 ...
 */
public class GroupUser {
    /**
     * Mapper
     */
    public static class GroupUserMapper extends Mapper<LongWritable,Text,Text,SpendBean>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String val = value.toString();
            String[] split = val.split("\t");
            //这里就不作字符串异常的处理了,核心代码简单点
            String name = split[0];
            String province = split[1];
            String type = split[2];
            int money = Integer.parseInt(split[3]);
            SpendBean groupUser = new SpendBean();
            groupUser.setUserName(new Text(name));
            groupUser.setMoney(new IntWritable(money));
            groupUser.setProvince(new Text(province));
            context.write(new Text(name),groupUser);
        }
    }

    /**
     * reducer
     */
    public static class GroupUserReducer extends Reducer<Text,SpendBean,Text,SpendBean> {
        /**
         * 姓名
         * @param key
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<SpendBean> values, Context context) throws IOException, InterruptedException {
            int money = 0;//消费金额
            //遍历
            Text province = null;
            for(SpendBean bean : values){
                money += bean.getMoney().get();
                province = bean.getProvince();
            }
            //输出汇总结果
            context.write(key,new SpendBean(key,new IntWritable(money),province));
        }
    }

    /**
     * 封装的bean
     */
    public static class SpendBean implements Writable{

        private Text userName;

        private IntWritable money;

        private Text province;


        public SpendBean(Text userName, IntWritable money, Text province) {
            this.userName = userName;
            this.money = money;
            this.province = province;
        }

        /**
         * 反序列化时必须有一个空参的构造方法
         */
        public SpendBean(){}

        /**
         * 序列化的代码
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            userName.write(out);
            money.write(out);
            province.write(out);
        }

        /**
         * 反序列化的代码
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            userName = new Text();
            userName.readFields(in);
            money = new IntWritable();
            money.readFields(in);
            province = new Text();
            province.readFields(in);
        }

        public Text getUserName() {
            return userName;
        }

        public Text getProvince() {
            return province;
        }

        public void setProvince(Text province) {
            this.province = province;
        }

        public void setUserName(Text userName) {
            this.userName = userName;
        }

        public IntWritable getMoney() {
            return money;
        }

        public void setMoney(IntWritable money) {
            this.money = money;
        }

        @Override
        public String toString() {
            return "SpendBean{" +
                    "userName=" + userName +
                    ", money=" + money +
                    ", province=" + province +
                    '}';
        }
    }


    /**
     * 编写启动类
     * @param args
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(GroupUser.class); //设置jar中的启动类,可以根据这个类找到相应的jar包

        job.setMapperClass(GroupUserMapper.class); //设置mapper的类
        job.setReducerClass(GroupUserReducer.class); //设置reducer的类

        job.setPartitionerClass(ProvincePartitioner.class);//指定数据分区规则,不是必须要的,根据业务需求分区
        job.setNumReduceTasks(7); //设置相应的reducer数量,这个数量要与分区的大最数量一致

        job.setMapOutputKeyClass(Text.class); //mapper输出的key
        job.setMapOutputValueClass(SpendBean.class); //mapper输出的value

        job.setOutputKeyClass(Text.class); //最终输出的数据类型
        job.setOutputValueClass(SpendBean.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));//输入的文件位置
        FileOutputFormat.setOutputPath(job,new Path(args[1]));//输出的文件位置

        boolean b = job.waitForCompletion(true);//等待完成,true,打印进度条及内容
        if(b){
            //success
        }

    }
}
hadoop系列三:mapreduce的使用(一)

 

 

 

 

4.4:在hadoop运行分区代码

我们重新打包项目后,重新上传到服务器中,直接执行命令运行

hadoop jar hadoop-mapreduce-1.0.jar com.zxj.hadoop.demo.mapreduce.staffspend.groupuser.GroupUser /staffspend/input /staffspend/output2

结果会发现reducer的过程,明显慢了下来,因为是在reducer中分区,所以自然会慢了一些。

执行完成后,我们查看输出列表

 

hadoop fs -ls /staffspend/output2

 

hadoop系列三:mapreduce的使用(一)

可以看到,这里有7个文件,对应着7个分区,执行命令查看内容

hadoop系列三:mapreduce的使用(一)

可以看到,这其中的数据,就是在一个省份中,每个人分别花了多少钱

 

 

 

 

 

 

 

五:数据排序及对象的重用

下载代码:点我下载

这一部分会讲到数据的排序,这种需求也是会经常会有的,比如上面的例子中,我就想知道公司哪个员工的经费是最多的。

其次就是对象的重用,既然是大数据,那么map的次数远远不止上亿这么简单,我们每次都要重复创建一个bean吗?

先准备一些数据,我们也可以用之前计算出来的数据,但是由于之前打印的格式不好,是toString()的默认格式,所以我这里再准备一份数据

 

hadoop系列三:mapreduce的使用(一)
张三    2980
李四    8965
王五    1987
小黑    6530
小陈    2963
小梅    980
hadoop系列三:mapreduce的使用(一)

 

我们开始编码

 

 

 

5.1:编写排序代码

首先再准备一份bean,这个bean和以前不一样,需要实现排序接口

 

hadoop系列三:mapreduce的使用(一)
/**
     * 我们需要实现一个新的接口,这个接口包含了排序接口以及序列化接口
     */
    public static class Spend implements WritableComparable<Spend>{
        private Text name; //姓名
        private IntWritable money; //花费

        public Spend(){}

        public Spend(Text name, IntWritable money) {
            this.name = name;
            this.money = money;
        }

        public void set(Text name, IntWritable money) {
            this.name = name;
            this.money = money;
        }
        @Override
        public int compareTo(Spend o) {
            return o.getMoney().get() - this.money.get();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            name.write(out);
            money.write(out);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name = new Text();
            name.readFields(in);
            money = new IntWritable();
            money.readFields(in);
        }


        public Text getName() {
            return name;
        }

        public void setName(Text name) {
            this.name = name;
        }

        public IntWritable getMoney() {
            return money;
        }

        public void setMoney(IntWritable money) {
            this.money = money;
        }

        @Override
        public String toString() {
            return name.toString() + "\t" + money.get();
        }
    }
hadoop系列三:mapreduce的使用(一)

 

其实这个排序接口就是jdk自带的一个排序接口,使用方法与jdk的一致,所以就不讲的太深入,主要就是靠这个接口来进行排序。

 

 

 

 

5.2:编写mapper(对象的复用)

这部分的mapper很简单,没有什么特殊要讲的内容

hadoop系列三:mapreduce的使用(一)
    public static class SortMapper extends Mapper<LongWritable,Text,Spend,Text>{
        private Spend spend = new Spend();
        private IntWritable moneyWritable = new IntWritable();
        private Text text = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split("\t");//这里就不做异常处理了,只写核心逻辑
            String name = split[0];
            int money = Integer.parseInt(split[1]);
            text.set(name);
            moneyWritable.set(money);
            spend.set(text, moneyWritable);
            context.write(spend,text);
        }
    }
hadoop系列三:mapreduce的使用(一)

代码逻辑上并没有什么可说的,因为数据已经是汇总的数据了,只是进行一个排序而已,而排序的代码又写在bean中实现的接口上了,这里主要就是讨论一下对象的复用。

因为大数据动则数十亿上百亿的数据,如果重复创建这么多对象,那么将增加GC的工作,我们可以复用它,就是把它定义在上方,在调用它的set方法,可以更新这个对象的值。

可能有人会觉得,在第二次操作这个对象的时候,那不是会改变这个对象的值吗?没错的,是会改变。那么第一次操作这方法时创建的对象,保留的引用不是也会更新值吗?答案是不会的,生成的bean一经写出,就会序列化出去,这个时候已经是一个序列化的数据了,序列化的数据在reducer中将会反序列化,这个时候,和这个对象已经没有关系了。

 

 

 

5.3:编写reducer

reducer平淡出奇,实在是没有什么可说的,直接输出结果就行

 

hadoop系列三:mapreduce的使用(一)
public static class SortReducer extends Reducer<Spend,Text,Text,Spend>{
        /**
         * 因为在这之前已经是汇总的结果了,所以这里直接输出就行了
         * @param key
         * @param values  这里面只有一个,就是姓名
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Spend key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            context.write(values.iterator().next(),key);
        }
    }
hadoop系列三:mapreduce的使用(一)

 

 

 

 

 

5.4:编写启动类

启动类与也是一样的,只不过不需要加上分区的代码

hadoop系列三:mapreduce的使用(一)
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration config = new Configuration();

        Job job = Job.getInstance(config);

        job.setJarByClass(SortGroupUser.class);

        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);

        job.setMapOutputKeyClass(Spend.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Spend.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        if(b){
            //success
        }

    }
hadoop系列三:mapreduce的使用(一)

这里的代码就没有注释了,想看注释的可以看上面部分的代码

 

 

 

5.5:完整的代码

为了防止强迫证的同学,贴出完整的代码

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.staffspend.groupuser;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @Author 朱小杰
 * 时间 2017-07-29 .15:48
 * 说明 带有排序功能的统计,
 */
public class SortGroupUser {

    public static class SortMapper extends Mapper<LongWritable,Text,Spend,Text>{
        private Spend spend = new Spend();
        private IntWritable moneyWritable = new IntWritable();
        private Text text = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split("\t");//这里就不做异常处理了,只写核心逻辑
            String name = split[0];
            int money = Integer.parseInt(split[1]);
            text.set(name);
            moneyWritable.set(money);
            spend.set(text, moneyWritable);
            context.write(spend,text);
        }
    }

    public static class SortReducer extends Reducer<Spend,Text,Text,Spend>{
        /**
         * 因为在这之前已经是汇总的结果了,所以这里直接输出就行了
         * @param key
         * @param values  这里面只有一个,就是姓名
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Spend key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            context.write(values.iterator().next(),key);
        }
    }


    /**
     * 我们需要实现一个新的接口,这个接口包含了排序接口以及序列化接口
     */
    public static class Spend implements WritableComparable<Spend>{
        private Text name; //姓名
        private IntWritable money; //花费

        public Spend(){}

        public Spend(Text name, IntWritable money) {
            this.name = name;
            this.money = money;
        }

        public void set(Text name, IntWritable money) {
            this.name = name;
            this.money = money;
        }
        @Override
        public int compareTo(Spend o) {
            return o.getMoney().get() - this.money.get();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            name.write(out);
            money.write(out);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name = new Text();
            name.readFields(in);
            money = new IntWritable();
            money.readFields(in);
        }


        public Text getName() {
            return name;
        }

        public void setName(Text name) {
            this.name = name;
        }

        public IntWritable getMoney() {
            return money;
        }

        public void setMoney(IntWritable money) {
            this.money = money;
        }

        @Override
        public String toString() {
            return name.toString() + "\t" + money.get();
        }
    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration config = new Configuration();

        Job job = Job.getInstance(config);

        job.setJarByClass(SortGroupUser.class);

        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);

        job.setMapOutputKeyClass(Spend.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Spend.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        if(b){
            //success
        }

    }
}
hadoop系列三:mapreduce的使用(一)

mapper与reducer都写在这里面了。

 

 

 

 

5.6:在hadoop中执行排序

我们把新准备的数据命令为all.txt,然后上传到服务器,再上传到hadoop的hdfs中

创建目录

 

hadoop fs -mkdir -p /staffsort/input

 

上传文件

hadoop fs -put all.txt /staffsort/input

执行运算

hadoop jar hadoop-mapreduce-1.0.jar com.zxj.hadoop.demo.mapreduce.staffspend.groupuser.SortGroupUser /staffsort/input /staffsort/output

查看输出

hadoop fs -ls /staffsort/output
hadoop fs -cat /staffsort/output/part-r-00000

hadoop系列三:mapreduce的使用(一)

OK完成

 

 

 

 

 

 

 

六:统计一本小说中出现的词汇(包含Combiner)

下载代码:点我下载

本部分涵盖了Combiner的知识点,以及在应用场景上是计算了斗破苍穹中哪些词汇出现的次数最多,达到这样一个效果,需要进行两次mapreducer,第一次是汇总,第二次是排序

 

6.1:准备工作

1:斗破苍穹.txt(自行下载)

2:中文分词器 ansj(也可以用别的)

 

        <dependency>
            <groupId>org.ansj</groupId>
            <artifactId>ansj_seg</artifactId>
            <version>5.1.1</version>
        </dependency>

 

 

 

 

6.2:配置maven打包包含分词的依赖

我们的代码是要打成jar包到hadoop中运行的,之前的代码中,我们并没有依赖其它的东西,这次我们要依赖分词器,因为hadoop中是不带有这个东西的,所以我们打包的时候,也要把这个分词器打包进来,所以我们使用maven-assembly-plugin插件。这个插件可能很多人都用过,可是你们觉得仅仅是配置打包其它的依赖这么简单吗?no!no!no!我们要打出来的包,只包含分词器呀,因为在pom文件中,还包含了hadoop的jar包,我们不需要hadoop的jar包也打进来,因为在hadoop运行环境中,这些代码是在hadoop中存在的,而且加上hadooop的jar后,打出来的包会变的特别大。

我们现在要做的是打现来的包,只包含我们自己的代码加上分词器的jar。

我们看一下怎么做,如果朋友们有更好的方案,请在评论中指点,不胜感激

hadoop系列三:mapreduce的使用(一)
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4.1</version>
                <executions>
                    <execution>
                        <id>make-jar</id>
                        <!-- 绑定到package生命周期阶段上 -->
                        <phase>package</phase>
                        <goals>
                            <!-- 只执行一次 -->
                            <goal>single</goal>
                        </goals>
                        <configuration>
                            <descriptors> <!--描述文件路径-->
                                <descriptor>src/main/assemble/package.xml</descriptor>
                            </descriptors>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
hadoop系列三:mapreduce的使用(一)

上面是pom文件中的配置,但是上面依赖了一个其它的配置文件,我们把它建在了相应的目录,具体内容如下

hadoop系列三:mapreduce的使用(一)
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
    <!-- 生成的文件中,会带有这一部分 -->
    <id>a</id>
    <!-- 根目录中是否包含项目目录,不需要 -->
    <includeBaseDirectory>false</includeBaseDirectory>
    <formats>
        <format>jar</format>
    </formats>

    <fileSets>
        <!-- 打包本工程的代码,如果没有这部分,那么打出来的包不包含本项目的代码 -->
        <fileSet>
            <!-- ${project.build.directory}是打包后的target目录 -->
            <directory>${project.build.directory}/classes</directory>
            <outputDirectory></outputDirectory>
        </fileSet>
    </fileSets>


    <dependencySets>
        <dependencySet>
            <useProjectArtifact>true</useProjectArtifact>
            <useProjectAttachments>true</useProjectAttachments>
            <!-- 输出的位置,这是在根目录中 -->
            <outputDirectory></outputDirectory>
            <!-- 把代码解压出来,否则会是一个jar包的形式在里面 -->
            <unpack>true</unpack>
            <includes>
                <!-- 可以设置只加入这个maven的依赖 -->
                <include>org.ansj:ansj_seg</include>
                <include>org.nlpcn:nlp-lang</include>
                <include>org.nutz:nutz</include>
            </includes>
        </dependencySet>
    </dependencySets>
</assembly>
hadoop系列三:mapreduce的使用(一)

如上就配置完了

 

 

 

 

6.3:数据汇总(Combiner)

第一步,我们要对数据进行汇总,不然怎么排序呢?汇总的代码与之前wordcount差不多,但是数据量就多了,毕竟那不是我随意编写的测试数据,而是一本小说,所以这里我们用到Combiner。

简要的说一个Combiner的作用,Combiner就是在map的阶段,先进行一步汇总,减少reducer的汇总的数据量。这个马上会讲到。

 

现在先来准备一个Mapper,因为输出的就是词汇和数量,所以也不需要自定义bean

 

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.story;

import org.ansj.domain.Result;
import org.ansj.domain.Term;
import org.ansj.splitWord.analysis.ToAnalysis;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

/**
 * @Author 朱小杰
 * 时间 2017-07-29 .19:00
 * 说明 统计一本小说哪些词出现的次数最多
 */
public class StoryMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    private Text text = new Text();
    private LongWritable longWritable = new LongWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString().trim();
        //剔除空的一行
        if(!StringUtils.isBlank(line)){
            //分词的代码
            Result parse = ToAnalysis.parse(line);
            List<Term> terms = parse.getTerms();
            Iterator<Term> iterator = terms.iterator();
            while (iterator.hasNext()){
                Term term = iterator.next();
                longWritable.set(1);
                text.set(term.getName());
                context.write(text,longWritable);
            }
        }
    }
}
hadoop系列三:mapreduce的使用(一)

 

代码和以前不同的是,这里面加入了分词的代码,将每一个词,当作一个key输出。

 

reducer的代码

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.story;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * @Author 朱小杰
 * 时间 2017-07-29 .19:10
 * 说明 统计小说
 */
public class StoryReducer extends Reducer<Text, LongWritable, LongWritable, Text> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        Iterator<LongWritable> iterator = values.iterator();
        long num = 0;
        while (iterator.hasNext()){
            LongWritable longWritable = iterator.next();
            num += longWritable.get();
        }
        context.write(new LongWritable(num),key);
    }
}
hadoop系列三:mapreduce的使用(一)

reducer的代码就是简单的汇总,然后将数据输出到文本中。

 

 

此时有必要说一个Combiner,我们先看一个怎么设置一个Combiner

 

Job job = ..
job.setCombinerClass(SortCombiner.class);//设置Combiner

 

再看一下Combiner中的需要传一个什么东西

hadoop系列三:mapreduce的使用(一)
  /**
   * Set the combiner class for the job.
   * @param cls the combiner to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setCombinerClass(Class<? extends Reducer> cls
                               ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
  }
hadoop系列三:mapreduce的使用(一)

是不是很奇怪,这里竟然是接收一个reducer。那我们能不能直接设置为reducer的类呢?答案是不行的,因为阶段不一样,Combiner是在运行完map后,自行汇总了一次,而Combiner汇总完之后,会再传到reducer进行大汇总。从流程上面来说,是这样子的,我草草画了一个图,可以看一下

这个是原来没有Combiner的图

hadoop系列三:mapreduce的使用(一)

这是加有Combiner的图

hadoop系列三:mapreduce的使用(一)

从流程上面看到Mapper后,如果有Combiner,会进行Combiner,再进行Reducer,也就意味着,Mapper的输出,成为了Combiner的输出,且Combiner的输出,成为了Reducer的输入。

但是Combiner需要遵循一个规则。Combiner需要作为一个可插拔的插件,可有可无,就算移除Combiner,也不会对结果造成任何影响。

为什么要使用Combiner呢?就是在各个map中预先进行一次,然后减少在reducer阶段的数据量,这样能提升很高的效率。

贴出Combiner的代码

 

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.story;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * @Author 朱小杰
 * 时间 2017-07-29 .20:03
 * 说明 ...
 */
    public class SortCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
    private LongWritable longWritable = new LongWritable();

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        Iterator<LongWritable> iterator = values.iterator();
        long num = 0;
        while (iterator.hasNext()){
            LongWritable longWritable = iterator.next();
            num += longWritable.get();
        }
        longWritable.set(num);
        context.write(key,longWritable);
    }
}
hadoop系列三:mapreduce的使用(一)

 

可以看到,这里的逻辑与reducer中差不多,其实就是在map阶段进行了一步汇总而已,值得关注的是,输出与输入是一样的,因为Combiner汇总后还是要交给reducer进行大汇总的。

 

 

最后看main方法,main方法也差不多,就是加上了设置Combiner的代码而已

 

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.story;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @Author 朱小杰
 * 时间 2017-07-29 .19:14
 * 说明 ...
 */
public class StoryDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(StoryDriver.class);

        job.setMapperClass(StoryMapper.class);
        job.setReducerClass(StoryReducer.class);

        job.setCombinerClass(SortCombiner.class);//设置Combiner

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        if(b){

        }

    }
}
hadoop系列三:mapreduce的使用(一)

 

 

把小说命名为dpcq.txt,上传到hadoop中,记得文件编码哦,最好是utf-8编码

hadoop fs -mkdir -p /story/input
hadoop fs -put dpcq.txt /story/input

然后打包后,把包含分词器的jar上传到服务器并且在hadoop中运行

 

 

 

hadoop jar hadoop-mapreduce-1.0-a.jar com.zxj.hadoop.demo.mapreduce.story.StoryDriver /story/input /story/output

执行结果如下

hadoop系列三:mapreduce的使用(一)

但是这并不是我们想要的结果,我们需要它对词汇出现的数量进行排序,所以我们还要进行一个排序的mapreducer

 

 

 

 

6.4:排序阶段

通过上面的汇总,我们已经得到了每个词分别出现了多少次,这一部分我们要对其进行排序,这一部分极其简单,我们之前也看过排序是怎么做的,实现一个Comparable接口而已,但是实际上我们这里并不需要实现,因为我们是根据词汇出现的次数来排序,我们来看一个LongWritable的源码

hadoop系列三:mapreduce的使用(一)

可以想象,LongWritable已经实现了排序接口,不需要我们去处理,不过LongWritable实现的是一个正序的排序,我们要拉到最底下才能看到哪个词汇出现了最多,如果我们要看倒序排的话,我们就要自己实现咯,如下就让long类型的数据是倒序排的

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.story.sort;

import org.apache.hadoop.io.LongWritable;

/**
 * @Author 朱小杰
 * 时间 2017-07-29 .21:00
 * 说明 一个倒序的Long
 */
public class MyLongWritable extends LongWritable {
    @Override
    public int compareTo(LongWritable o) {
        if(o.get() > this.get()){
            return 1;
        }else if (o.get() == this.get()){
            return 0;
        }else{
            return -1;
        }
    }
}
hadoop系列三:mapreduce的使用(一)

这里直接继承了LongWritable,重写了它的排序代码,不过留一个悬念,为什么实现的代码不直接使用

return (int)(o.get() - this.get())

这不是会简单好多吗?为什么不使用呢?大家可以在评论里面回答哈!

 

好,我们已经定义了一个倒序的MyLongWribable,排序的时候,我们就用它好了

其它的代码就特别简单了,看mapper如下

 

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.story.sort;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @Author 朱小杰
 * 时间 2017-07-29 .20:43
 * 说明 ...
 */
public class SortMapper extends Mapper<LongWritable, Text, MyLongWritable, Text> {
    private Text text = new Text();
    private MyLongWritable longWritable = new MyLongWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String content = value.toString().trim();
        if(!StringUtils.isBlank(content)){
            String[] split = content.split("\t");
            if(split.length == 2){
                long number = Long.parseLong(split[0]);//出现的次数
                String word = split[1];  //词汇
                longWritable.set(number);
                text.set(word);
                context.write(longWritable,text);
            }
        }
    }
}
hadoop系列三:mapreduce的使用(一)

 

如果你看明白了上面的一些说明,那么对于这里的代码,肯定是能看的懂的,否则自行回去复习哈。这里为什么输出的key是LongWritable呢?不是写自定义的MyLongWritable呢?因为这个key是hadoop传入的,这里面的值是代码着读取文件的位置,所以我们不能用我们自定义的排序Long,但是其它地方,就可以用了,比如在输出的地方

 

再看reducer的代码

 

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.story.sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @Author 朱小杰
 * 时间 2017-07-29 .20:49
 * 说明 ...
 */
public class SortReducer extends Reducer<MyLongWritable, Text, Text, MyLongWritable> {
    @Override
    protected void reduce(MyLongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        context.write(values.iterator().next(),key);
    }
}
hadoop系列三:mapreduce的使用(一)

 

再看reducer的代码,那简直是简单到没话说了,给啥就输出啥,现在我们也知道,排序是按照reducer的输入key来进行排序的,那么它就会根据我们自定义的排序规则进行排序。

 

再看main方法,我甚至都有不想贴main方法的冲动了,没什么可写的嘛。

 

hadoop系列三:mapreduce的使用(一)
package com.zxj.hadoop.demo.mapreduce.story.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @Author 朱小杰
 * 时间 2017-07-29 .20:50
 * 说明 ...
 */
public class SortDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(SortDriver.class);

        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);


        job.setMapOutputKeyClass(MyLongWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(MyLongWritable.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        if(b){
            //success
        }
    }
}
hadoop系列三:mapreduce的使用(一)

 

 

代码写完了,我们就把它放到mapreducer中运行,打包上传到服务器,直接执行命令

hadoop jar  hadoop-mapreduce-1.0-a.jar com.zxj.hadoop.demo.mapreduce.story.sort.SortDriver /story/output /story/output2

自定义的参数中,第一个/story/output是上一次对小说进行词汇汇总的输出目录,因为我们排序就是要对这个输出结果进行排序,并不是乱写的哦。

执行完成之后,查看结果

hadoop系列三:mapreduce的使用(一)

出现最多的是逗号,好吧,我们应该排除标点符号的

hadoop系列三:mapreduce的使用(一)

这些词汇都是分词器进行划分的,与hadoop并无关系,如果觉得词汇表达不准,也可以换一个分词器,或者自己自定义一些词汇。