MapReduce去空去重
MapReduce清洗数据
注意:此代码虽能完美实现需求,但还有一些地方需要优化
详细优化内容请访问下方链接,更新时间2019/03/13
https://blog.****.net/weixin_42063239/article/details/88537897
前言
爬虫爬取的数据中会有一些数据有空值或者数据重复,想要得到规范的数据则要进行数据清洗,对保存为csv类型的数据来说可以用python的pandas库进行清洗,无论是去重还是去空几行代码就可以解决,但是对于海量数据来说用MapReduce则是更优的选择,而且马上要参加的比赛也是要求用MapReduce进行清洗,想想也是,如果用pandas一条drop_duplicates()那也没有什么能考察的点了,下面是我在爬取前程无忧职位招聘相关数据后对其进行清洗的过程
目标
对上传的csv文件进行去空去重,并把每个字段的间隔由逗号“,”改为“|”
准备
需要搭建好的hadoop平台,伪分布式亦可,MapReduce代码使用intellij IDEA开发并打出jar包上传到linux中运行
数据
这是部分数据,因为并不是很大只有20M+所以直接用excel打开了,但依然需要一些时间响应
注意:这种操作是错误的!!!如果数据量够大,基本这样操作就GG了,正确操作为在linux系统里用head -100 file_name查看前100行数据,或者用pandas等查看部分数据,这里为了方便所以直接用excel打开
代码部分
创建项目
IDEA新建普通java项目,导入编写MapReduce所需要的jar包注:曾经耗费大量时间去在网上找jar包,****上的jar包都需要用积分去下载而且版本也不一定对,历经千辛万苦从hadoop官网下了几个jar包后发现还是少,最后发现集群中hadoop文件夹下就有,也就是我们解压的那个hadoop的tar包里就有,在hadoop目录下share/hadoop下hdfs等文件夹下都有QAQ
思路
- 实现Mapper和Reducer类并重写map()和reduce()方法
- 在map阶段通过split(",")得到每行数据字段数据的数组
- 用trim()方法去除字符串两端空格并用isEmpty()判断是否有字段的数据为空
- 没有空字段则把数据传入reduce
- reduce输出数据
编写代码
首先我们要创建两个类分别继承Mapper和Reducer类,可以选择新建两个java文件去继承,然后再用一个java文件存放main方法,也可以在main方法的类中创建内部类,效果相同,这里为了方便直接编写一个java文件
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
public class DropNull {
public static class Map extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//读取传入的value并转为字符串
String line = value.toString();
//判断是否为完整数据
if (isRightData(line)) {
//把“,”替换为“|”
String data = line.replace(",", "|");
//把替换后的文本转型为Text作为key输出到reduce,输出的value为空字符串
context.write(new Text(data), new Text(""));
}
}
/*
*
* 用split()分割,用isEmpty()判断是否为空,如果完整则返回true反之则返回false
*/
boolean isRightData(String info){
boolean flag = true;
String[] items = info.split(",");
for(String item: items){
if (item.trim().isEmpty()){
flag = false;
break;
}
}
return flag;
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
//将输入中的key复制到输出数据上的key上并直接输出
public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException{
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job对象
Job job = Job.getInstance();
//通过设置的主类找到上传的jar包
job.setJarByClass(DropNull.class);
//设置Map处理类
job.setMapperClass(DropNull.Map.class);
//设置Combine处理类
job.setCombinerClass(DropNull.Reduce.class);
//设置Reduce处理类
job.setReducerClass(DropNull.Reduce.class);
//设置输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输入和输出路径
Path in = new Path("hdfs://192.168.1.101:9000/in/data.csv");
Path out = new Path("hdfs://192.168.1.101:9000/out");
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
//判断是否完成,完成则结束运行
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行
首先用IDEA打出此项目的jar包
点击File——>Project Structure——>Atifacts——>绿色加号——>JAR——>From moudles with...
如图
Main Class选择自己创建的类
OK——>Apply
注意:出现以下错误是因为已经生成过,解决方法在下面
删除src下META-INF下的MANIFEST.MN
然后重新来一遍
成功后Build——>Build Artifact——>选中——>Build
打包完在out——>artifacts中有相应jar包
上传到集群中
启动hadoop相关服务
start-all.sh
hadoop jar hadoop.jar运行jar包
注意:查看hdfs根目录下有没有out文件夹,如果有则要删除再运行
查看/in/下有没有需要清洗的目标文件,如果没有则需要上传
部分清洗结果如图,可以在hdfs中/out下查看清洗完的结果
part-r-00000就是清洗完毕的文件,把他取到linux中
这里不做查看了,因为文件编码是GBK,hadoop默认为UTF-8所以会产生乱码,可以复制到windows下查看