Map Join和Reduce Join的区别以及代码实现

MapReduce Join

对两份数据data1和data2进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。

如果数据量比较大,在内存进行连接操会发生OOM。mapreduce join可以用来解决大数据的连接。 

1 思路 

1.1 reduce join

在map阶段, 把关键字作为key输出,并在value中标记出数据是来自data1还是data2。因为在shuffle阶段已经自然按key分组,reduce阶段,判断每一个value是来自data1还是data2,在内部分成2组,做集合的乘积。

这种方法有2个问题:

1, map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。

2, reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。

实现代码如下:

主程序入口代码:

package com.ibeifeng.mapreduce.join;  

  

import java.io.IOException;  

import java.util.ArrayList;  

import java.util.Iterator;  

import java.util.List;  

import java.util.StringTokenizer;  

  

import org.apache.hadoop.conf.Configuration;  

import org.apache.hadoop.conf.Configured;  

import org.apache.hadoop.fs.Path;  

import org.apache.hadoop.io.IntWritable;  

import org.apache.hadoop.io.LongWritable;  

import org.apache.hadoop.io.NullWritable;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapreduce.Job;  

import org.apache.hadoop.mapreduce.Mapper;  

import org.apache.hadoop.mapreduce.Reducer;  

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

import org.apache.hadoop.mapreduce.task.reduce.Shuffle;  

import org.apache.hadoop.util.Tool;  

import org.apache.hadoop.util.ToolRunner;  

  

public class MapReduceJoin extends Configured implements Tool{  

    //定义map处理类模板  

    public static class map extends Mapper<LongWritable, Text, IntWritable, DataJoin>{  

        private IntWritable outputkey = new IntWritable();  

        private DataJoin datajoin = new DataJoin();  

        protected void map(LongWritable key, Text values, Context context)  

                throws IOException, InterruptedException {  

            //1.获取字符串  

            String str = values.toString();  

            //2.对字符串进行分割  

            String[] value = str.split(",");  

            //3.对非法数据进行过滤  

            int len = value.length;  

            if(len!=3&&len!=4) {  

                return;  

            }  

            //4.取出cid  

            String cid = value[0];  

            //5.判断是是customer表还是order  

            if(len == 3) {  

                //表示是customer  

                String cname = value[1];  

                String cphone = value[2];  

                datajoin.set("Customer", cid+","+cname+","+cphone);  

            }  

            if(len == 4) {  

                //表示是order  

                String oname = value[1];  

                String oprice = value[2];  

                String otime = value[3];  

                datajoin.set("Order", cid+","+oname+","+oprice+","+otime);  

            }  

            outputkey.set(Integer.valueOf(cid));  

            context.write(outputkey, datajoin);  

        }  

    }  

      

    //定义reduce处理类模板  

    public static class reduce extends Reducer<IntWritable, DataJoin, NullWritable, Text>{  

          

        private Text outputvalue = new Text();  

        @Override  

        protected void reduce(IntWritable key, Iterable<DataJoin> values,  

                Context context) throws IOException, InterruptedException {  

              

            //定义一个字符串用于保存客户信息  

            String customerInfo = null;  

            //定义一个List集合用于保存订单信息  

            List<String> list = new ArrayList<String>();  

            for(DataJoin datajoin : values) {  

                if(datajoin.getTag().equals("Customer")) {  

                    System.out.println(datajoin.getData());  

                    customerInfo = datajoin.getData();  

                }  

                if(datajoin.getTag().equals("Order")) {  

                    list.add(datajoin.getData());  

                }  

            }  

            //进行输出  

            for(String s :list) {  

                outputvalue.set(customerInfo+","+s);  

                context.write(NullWritable.get(), outputvalue);  

            }  

        }  

    }  

    //配置Driver模块  

    public int run(String[] args) {  

          

        //1.获取配置配置文件对象  

        Configuration configuration = new Configuration();  

        //2.创建给mapreduce处理的任务  

        Job job = null;  

        try {  

            job = Job.getInstance(configuration,this.getClass().getSimpleName());  

        } catch (IOException e) {  

            e.printStackTrace();  

        }  

        try {  

            //3.创建输入路径  

            Path source_path = new Path(args[0]);  

            FileInputFormat.addInputPath(job, source_path);  

            //4.创建输出路径  

            Path des_path = new Path(args[1]);  

            FileOutputFormat.setOutputPath(job, des_path);  

        } catch (IllegalArgumentException e) {  

            e.printStackTrace();  

        } catch (IOException e) {  

            e.printStackTrace();  

        }  

          

        //设置让任务打包jar运行  

        job.setJarByClass(MapReduceJoin.class);  

        //5.设置map  

        job.setMapperClass(map.class);  

        job.setMapOutputKeyClass(IntWritable.class);  

        job.setMapOutputValueClass(DataJoin.class);  

          

        //================shuffle========================  

        //1.分区  

//      job.setPartitionerClass(MyPartitioner.class);  

        //2.排序  

//      job.setSortComparatorClass(cls);  

        //3.分组  

//      job.setGroupingComparatorClass(MyGroup.class);  

        //4.可选项,设置combiner,相当于map过程的reduce处理,优化选项  

//      job.setCombinerClass(Combiner.class);  

        //设置reduce个数  

//      job.setNumReduceTasks(2);  

        //================shuffle========================  

        //6.设置reduce  

        job.setReducerClass(reduce.class);  

        job.setOutputKeyClass(NullWritable.class);  

        job.setOutputValueClass(Text.class);        

        //7.提交jobyarn组件上  

        boolean isSuccess = false;  

        try {  

            isSuccess = job.waitForCompletion(true);  

        } catch (ClassNotFoundException e) {  

            e.printStackTrace();  

        } catch (IOException e) {  

            e.printStackTrace();  

        } catch (InterruptedException e) {  

            e.printStackTrace();  

        }  

        return isSuccess?0:1;  

    }   

    //书写主函数 

    public static void main(String[] args) {  

        Configuration configuration = new Configuration();  

        //1.书写输入和输出路径  

        String[] args1 = new String[] {  

                "hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/input",  

                "hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/output"  

        };    

        //2.设置系统以什么用户执行job任务  

        System.setProperty("HADOOP_USER_NAME""beifeng");  

        //3.运行job任务  

        int status = 0;  

        try {  

            status = ToolRunner.run(configuration, new MapReduceJoin(), args1);  

        } catch (Exception e) {  

            e.printStackTrace();  

        }  

//      int status = new MyWordCountMapReduce().run(args1);  

        //4.退出系统  

        System.exit(status);  

    }  

}  

自定义包装类代码:

package com.ibeifeng.mapreduce.join;  

  

import java.io.DataInput;  

import java.io.DataOutput;  

import java.io.IOException; 

import org.apache.hadoop.io.Writable;  

public class DataJoin implements Writable{  

    private String tag;  

    private String data;  

  

    public String getTag() {  

        return tag;  

    } 

    public String getData() {  

        return data;  

    } 

    public void set(String tag,String data) {  

        this.tag = tag;  

        this.data = data;  

    } 

    @Override  

    public String toString() {  

        return tag+","+data;  

    } 

    public void write(DataOutput out) throws IOException {  

        out.writeUTF(this.tag);  

        out.writeUTF(this.data);  

    }    

    public void readFields(DataInput in) throws IOException {  

        this.tag = in.readUTF();  

        this.data = in.readUTF();  

    } 

}  

 

准备测试数据如下(两个csv文件):

Map Join和Reduce Join的区别以及代码实现

 

Map Join和Reduce Join的区别以及代码实现

 

将csv文件上传至HDFS当中,并且将代码打包成jar,然后执行以下命令:

bin/yarn jar datas/mapreduce_join.jar /user/beifeng/wordcount/input/ /user/beifeng/wordcount/output

Map Join和Reduce Join的区别以及代码实现

结果如下:

Map Join和Reduce Join的区别以及代码实现

 

 

Map join

MapJoin 适用于有一份数据较小的连接情况。做法是直接把该小份数据直接全部加载到内存当中,按链接关键字建立索引。然后大份数据就作为 MapTask 的输入,对 map()方法的每次输入都去内存当中直接去匹配连接。然后把连接结果按 key 输出,这种方法要使用 hadoop中的 DistributedCache 把小份数据分布到各个计算节点,每个 maptask 执行任务的节点都需要加载该数据到内存,并且按连接关键字建立索引:

这里假设Customer为小表,Orders为大表,这也符合实际生产环境。

 

关于这种分布式缓存的用法,直接看下代码的演示:

主函数入口代码:

package com.ibeifeng.mapreduce.join;  

  

import java.io.BufferedReader;  

import java.io.IOException;  

import java.io.InputStreamReader;  

import java.net.URI;  

import java.util.HashMap;  

import java.util.Map;  

  

import org.apache.hadoop.conf.Configuration;  

import org.apache.hadoop.conf.Configured;  

import org.apache.hadoop.fs.FSDataInputStream;  

import org.apache.hadoop.fs.FileSystem;  

import org.apache.hadoop.fs.Path;  

import org.apache.hadoop.io.IntWritable;  

import org.apache.hadoop.io.LongWritable;  

import org.apache.hadoop.io.NullWritable;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapreduce.Job;  

import org.apache.hadoop.mapreduce.Mapper;  

import org.apache.hadoop.mapreduce.Reducer;  

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

import org.apache.hadoop.util.Tool;  

import org.apache.hadoop.util.ToolRunner;  

  

import javax.jdo.annotations.Order;  

  

public class MapJoin extends Configured implements Tool{  

    //定义缓存文件的读取路径  

    private static String  cacheFile = "hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/input1/customers.csv";  

    //定义map处理类模板  

    public static class map extends Mapper<LongWritable, Text, NullWritable, Text>{  

        private Text outputValue = new Text();  

        Map<Integer,Customer> map = null;       

        @Override  

        protected void setup(Context context)throws IOException, InterruptedException {  

            //读取分布式缓存文件  

            FileSystem fs = FileSystem.get(URI.create(cacheFile),context.getConfiguration());  

            FSDataInputStream fdis  = fs.open(new Path(cacheFile));  

            BufferedReader br = new BufferedReader(new InputStreamReader(fdis));  

            //创建一个map集合来保存读取文件的数据  

            map = new HashMap<Integer,Customer>();  

            String line = null;  

            while((line = br.readLine())!=null) {  

                String[] split = line.split(",");  

                Customer   customer  = new Customer(Integer.parseInt(split[0]), split[1], split[2]);  

                map.put(customer.getCid(),customer);  

            }  

            //关闭IO  

            br.close();  

        }  

        @Override  

        protected void map(LongWritable key, Text values, Context context)  

                throws IOException, InterruptedException {  

            //Customer表和Orders表的数据进行组合  

            String str = values.toString();  

            String[] Orders = str.split(",");  

            int joinID = Integer.valueOf(Orders[0]);  

            Customer customerid = map.get(joinID);  

            StringBuffer sbf = new StringBuffer();  

            sbf.append(Orders[0]).append(",")  

                    .append(customerid.getCname()).append(",")  

                    .append(customerid.getCphone()).append(",")  

                    .append(Orders[1]).append(",")  

                    .append(Orders[2]).append(",")  

                    .append(Orders[3]).append(",");  

            outputValue.set(sbf.toString());  

            context.write(NullWritable.get(),outputValue);  

    }  

    }       

    //reduce程序  

          

    //配置Driver模块  

    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  

          

        //获取配置配置文件对象  

        Configuration configuration = new Configuration();  

        //创建给mapreduce处理的任务  

        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());  

        //获取将要读取到内存的文件的路径,并加载进内存  

        job.addCacheFile(URI.create(cacheFile));  

        //创建输入路径  

        Path source_path = new Path(args[0]);  

        //创建输出路径  

        Path des_path = new Path(args[1]);  

        //创建操作hdfsFileSystem对象  

        FileSystem fs = FileSystem.get(configuration);  

        if (fs.exists(des_path)) {  

            fs.delete(des_path,true);  

        }  

        FileInputFormat.addInputPath(job, source_path);  

        FileOutputFormat.setOutputPath(job, des_path);  

        //设置让任务打包jar运行  

        job.setJarByClass(MapJoin.class);  

        //设置map  

        job.setMapperClass(map.class);  

        job.setMapOutputKeyClass(NullWritable.class);  

        job.setMapOutputValueClass(Text.class);  

        //设置reduceTask的任务数为0,即没有reduce阶段和shuffle阶段  

        job.setNumReduceTasks(0);  

          

        //提交jobyarn组件上  

        boolean isSuccess = job.waitForCompletion(true);  

        return isSuccess?0:1;  

    }  

    //书写主函数   

    public static void main(String[] args) {  

        Configuration configuration = new Configuration();  

        //1.书写输入和输出路径  

        String[] args1 = new String[] {  

                "hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/input",  

                "hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/output"  

        };  

        //2.设置系统以什么用户执行job任务  

        System.setProperty("HADOOP_USER_NAME""beifeng");  

        //3.运行job任务  

        int status = 0;  

        try {  

            status = ToolRunner.run(configuration, new MapJoin(), args1);  

        } catch (Exception e) {  

            e.printStackTrace();  

        }  

//      int status = new MyWordCountMapReduce().run(args1);  

        //4.退出系统  

        System.exit(status);  

    }  

}  

构造类代码:

package com.ibeifeng.mapreduce.join;  

  

import java.io.DataInput;  

import java.io.DataOutput;  

import java.io.IOException;  

import org.apache.hadoop.io.Writable;  

public class Customer implements Writable{  

    private int cid;  

    private String cname;  

    private String cphone; 

    public int getCid() {  

        return cid;  

    } 

    public void setCid(int cid) {  

        this.cid = cid;  

    }  

    public String getCname() {  

        return cname;  

    }  

    public void setCname(String cname) {  

        this.cname = cname;  

    }  

    public String getCphone() {  

        return cphone;  

    }  

    public void setCphone(String cphone) {  

        this.cphone = cphone;  

    }       

    public Customer(int cid, String cname, String cphone) {  

        super();  

        this.cid = cid;  

        this.cname = cname;  

        this.cphone = cphone;  

    }  

    public void write(DataOutput out) throws IOException {  

        out.writeInt(this.cid);  

        out.writeUTF(this.cname);  

        out.writeUTF(this.cphone);  

    }  

  

    public void readFields(DataInput in) throws IOException {  

        this.cid = in.readInt();  

        this.cname = in.readUTF();  

        this.cphone = in.readUTF();  

    }  

  @Override  

    public String toString() {  

        return "Customer [cid=" + cid + ", cname=" + cname + ", cphone=" + cphone + "]";  

    }       

}  

 

Map Join和Reduce Join的区别以及代码实现

 

Map Join和Reduce Join的区别以及代码实现

 

执行命令:bin/yarn jar datas/map_join.jar也是可以得到同样的结果:

Map Join和Reduce Join的区别以及代码实现