MapReduce程序数据清洗

一、首先准备好需要的清洗的数据
二、将数据导入项目中,在项目下新建input(原数据),output(清洗过后的数据),如下图所示:
MapReduce程序数据清洗三、导入所需要的jar
hadoop-2.8.5\share\hadoop\common*jar
hadoop-2.8.5\share\hadoop\common\lib*jar
hadoop-2.8.5\share\hadoop\hdfs*jar
hadoop-2.8.5\share\hadoop\hdfs\lib*jar
hadoop-2.8.5\share\hadoop\mapreduce*jar
hadoop-2.8.5\share\hadoop\mapreduce\lib*jar
hadoop-2.8.5\share\hadoop\yarn*jar
hadoop-2.8.5\share\hadoop\yarn\lib*jar

四、代码如下:

清洗类:

package com.stu.mr06;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author xiaowei
 * @date 2019/3/26   -10:29
 * 数据清洗
 */
public class GMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

        //得到数据
        String data = v1.toString();
        //剔除标题
        if (data.startsWith("STN")) {
            return;
        }
        //分割数据
        String[] str = data.split(" ");
        //声明并创建一个新的字符串数组
        String newWords = "";
        //循环遍历数组
        for (int i = 0; i < str.length; i++) {
            //剔除空的字符串
            if (str[i].equals("")) {
                continue;
            }
            //剔除*
            if (str[i].endsWith("*")) {
                str[i] = str[i].replace("*", "");
            }
            //剔除A-I
            if (str[i].matches(".*[A-I]")) {
                str[i] = str[i].substring(0, str[i].length() - 1);
            }
            //更换缺失字段
            if (str[i].equals("9999.9") || str[i].equals("999.9") ||
                    str[i].equals("99.9")) {
                str[i] = "0.0";
            }
            //拼接新字符串
            if (i == str.length) {
                newWords += str[i];
            } else {
                newWords += str[i] + "/";
            }
        }
        //输出  不能使用new NullWritable()来定义,获取空值只能NullWritable.get()来获取
        context.write(new Text(newWords),NullWritable.get());
    }
}

输出类:

package com.stu.mr06;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @author xiaowei
 * @date 2019/3/26   -16:18
 * 主程序
 */
public class GMain {

    public static void main(String args[]) throws Exception{
        //创建一个job
        Job job = Job.getInstance(new Configuration());
        //主程序入口
        job.setJarByClass(GMain.class);
        //指定map和map的输出类型
        job.setMapperClass(GMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //指定任务输入输出路径
        FileInputFormat.setInputPaths(job,new Path("Hadoop_API/input"));
        FileOutputFormat.setOutputPath(job,new Path("Hadoop_API/output/3265"));
        //完成任务
        job.waitForCompletion(true);
    }
}