MapReduce介绍
MapReduce分为两个部分一个是map,一个是reduce
对应的java类分别是mapper和reducer。
MapReduce的特性是对集群内的数据进行并行计算。
MapReduce的初识,例如黄匡内的是气象数据。经过第二步初始映射key是行号,value就是数据库,在经过第三步带有实际业务的映射输出把有用的数据抽出,key是年份,value是温度。然后经过shuttle(洗牌),按照年份进行进行合并也就是分组加排序的过程,把同样的key的value合并到一起并排序。然后经过reduce按照规则取出响应的数据,然后输出。
:
下面开始实践,首先创建一个mapper类。主要是实现上图第二个方块到第四个方块的转换。
public class MaxTemperatureMapper extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
KEYIN, VALUEIN,分别指上图方块二的key类型和value类型。 KEYOUT, VALUEOUT分别指上图方块三的key类型和value类型。
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.metrics2.util.Quantile;
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
//温度缺失
private static final int MISSING=9999;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if(line.charAt(87)=='+')
{
airTemperature = Integer.parseInt(line.substring(88,92));
}
else
{
airTemperature = Integer.parseInt(line.substring(87,92));
}
String quality=line.substring(92,93);
if(airTemperature!=MISSING&&quality.matches("[01459]"))
{
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
在建立一个reducer,主要是实现上图的第四个方块到第五个方块的转换。
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text keyIn, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for(IntWritable value:values)
{
maxValue = Math.max(maxValue, value.get());
}
context.write(keyIn, new IntWritable(maxValue));
}
}
最后创建一个类执行map和reduce
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class MaxTemperature {
public static void main(String[] args) {
try {
//作业:=map任务+reduce任务
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
//添加输入路径可以添加多个
//可以输具体文件也可以实文件夹,但不是递归处理,压缩的可以自动处理
FileInputFormat.addInputPath(job, new Path(args[0]));
//设置输出数据,只能有一个,但不能提前存在,会自动创建
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置mapper类
job.setMapperClass(MaxTemperatureMapper.class);
//设置reducer类
job.setReducerClass(MaxTemperatureReducer.class);
//设置输出的key类型
job.setOutputKeyClass(Text.class);
//设置输出的value类型
job.setOutputValueClass(IntWritable.class);
//等待作业完成
System.out.println(job.waitForCompletion(true));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
把上面的三个class导出jar包 hadoop-examples.jar,输入的文件是附件的weather.rar。
win
set HADOOP_CLASSPATH=hadoop-examples.jar
linux
export HADOOP_CLASSPATH=hadoop-examples.jar
执行
hadoop com.xxxxx.hadoop.MaxTemperature file:\\\D:\hadoopdemo\weather\19*.gz D:\hadoopdemo\weather\out
报错:
java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
是Text的包导错了
不是:import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
而是:import org.apache.hadoop.io.Text;
再次执行前一定要把out文件夹删除
执行结果: