hadoop学习笔记(四)MapReduce常见实例一:去重、求平均值

去重 

问题:有一个名为buyer_favorite1的数据文件,记录了用户收藏的商品以及收藏的日期 文件中包含(用户id,商品id,收藏日期)三个字段,数据内容以“\t”分割 要求根据商品id进行去重,统计用户收藏商品中都有哪些商品被收藏,输出去重后所有的商品id和收藏了该商品的用户id 。

数据内容如下:

用户id   商品id    收藏日期
10181   1000481   2010-04-04 16:54:31
20001   1001597   2010-04-07 15:07:52
20001   1001560   2010-04-07 15:08:27
20042   1001368   2010-04-08 08:20:30
20067   1002061   2010-04-08 16:45:33
20056   1003289   2010-04-12 10:50:55
20056   1003290   2010-04-12 11:57:35
20056   1003292   2010-04-12 12:05:29
20054   1002420   2010-04-14 15:24:12
20055   1001679   2010-04-14 19:46:04
20054   1010675   2010-04-14 15:23:53
20054   1002429   2010-04-14 17:52:45
20076   1002427   2010-04-14 19:35:39
20054   1003326   2010-04-20 12:54:44
20056   1002420   2010-04-15 11:24:49
20064   1002422   2010-04-15 11:35:54
20056   1003066   2010-04-15 11:43:01
20056   1003055   2010-04-15 11:43:06
20056   1010183   2010-04-15 11:45:24
20056   1002422   2010-04-15 11:45:49
20056   1003100   2010-04-15 11:45:54
20056   1003094   2010-04-15 11:45:57
20056   1003064   2010-04-15 11:46:04
20056   1010178   2010-04-15 16:15:20
20076   1003101   2010-04-15 16:37:27
20076   1003103   2010-04-15 16:37:05
20076   1003100   2010-04-15 16:37:18
20076   1003066   2010-04-15 16:37:31
20054   1003103   2010-04-15 16:40:14
20054   1003100   2010-04-15 16:40:16

“数据去重”主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重。

数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。

在MapReduce流程中,map的输出<key,value>经过shuffle过程聚集成<key,value-list>后交给reduce。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。

具体就是reduce的输入应该以数据作为key,而对value-list则没有要求(可以设置为空)。当reduce接收到一个<key,value-list>时就直接将输入的key复制到输出的key中,并将value设置成空值,然后输出<key,value>。

MaprReduce去重流程如下图所示:

           hadoop学习笔记(四)MapReduce常见实例一:去重、求平均值

 

package MapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import java.io.IOException;

// buyer_favorite1中包含(用户id,商品id,收藏日期)三个字段,数据内容以“\t”分割
// 要求根据商品id进行去重,统计用户收藏商品中都有哪些商品被收藏,输出去重后所有的商品id和收藏了该商品的用户id
public class Filter {

    public static class Map extends Mapper<Object , Text, Text , Text> {
        private static Text newKey=new Text();//从输入中得到的每行的数据的类型
        private static Text newValue=new Text();
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
            String line=value.toString();
            String arr[]=line.split("\t");//把输入的value用split()方法截取
            newKey.set(arr[1]);//截取出的商品id字段设置为key
            newValue.set(arr[0]);
            context.write(newKey, newValue);
        }
    }

    // shuffle自动根据key值进行排序,天然去重
    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        public static Text myValue = new Text();
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            String myList = new String();
            for (Text value : values)
                myList += value.toString() + ";";//输出所有收藏了该商品的用户id
            myValue.set(myList);
            context.write(key,myValue);
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf=new Configuration();
        Job job =new Job(conf,"filter");
        job.setJarByClass(Filter.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        Path in=new Path("hdfs://localhost:9000/mr/in/buyer_favorite1");
        Path out=new Path("hdfs://localhost:9000/mr/out/filter/buyer_favorite1");
        FileInputFormat.addInputPath(job,in);
        FileOutputFormat.setOutputPath(job,out);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

 执行结果如下:

            hadoop学习笔记(四)MapReduce常见实例一:去重、求平均值

 

求平均数

问题:现有关于商品点击情况的数据文件goods_click,包含两个字段(商品分类,商品点击次数),分隔符“\t”。

数据内容如下: 

商品分类 商品点击次数
52127	5
52120	93
52092	93
52132	38
52006	462
52109	28
52109	43
52132	0
52132	34
52132	9
52132	30
52132	45
52132	24
52009	2615
52132	25
52090	13
52132	6
52136	0
52090	10
52024	347

求平均数是MapReduce比较常见的算法,求平均数的算法也比较简单,一种思路是Map端读取数据,在数据输入到Reduce之前先经过shuffle,将map函数输出的key值相同的所有的value值形成一个集合value-list,然后将输入到Reduce端,Reduce端汇总并且统计记录数,然后作商即可。

具体原理如下图所示:

           hadoop学习笔记(四)MapReduce常见实例一:去重、求平均值 

 

package MapReduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

//goods_click包含两个字段(商品分类id,商品点击次数),分隔符“\t”
public class Average {

    public static class Map extends Mapper<Object , Text , Text , IntWritable>{
        private static Text newKey=new Text();
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
            String arr[]=value.toString().split("\t");//将输入的value值通过split()方法截取出来
            newKey.set(arr[0]);//把商品分类id字段设置为key
            IntWritable click=new IntWritable(Integer.parseInt(arr[1]));//把截取的商品点击次数字段转化为IntWritable类型并将其设置为value
            context.write(newKey, click);//直接输出key,value的值
        }
    }

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
        public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
            int num=0;
            int count=0;
            for(IntWritable val:values){
                num+=val.get();//把里面的每个元素求和
                count++;//统计values中元素的个数
            }
            IntWritable avg=new IntWritable(num/count);//用num除以count得到平均值avg
            context.write(key,avg);//将avg设置为value,直接输出<key,value>
        }
    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
        Configuration conf=new Configuration();
        Job job =new Job(conf,"Average");
        job.setJarByClass(Average.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        Path in=new Path("hdfs://localhost:9000/mr/in/goods_click");
        Path out=new Path("hdfs://localhost:9000/mr/out/Average/goods_click");
        FileInputFormat.addInputPath(job,in);
        FileOutputFormat.setOutputPath(job,out);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}