Mapreduce自定义数据类型
Hadoop自带的数据类型:
Intwritable,LongWritable,Text,xxWritable.
某些情况下:使用自定义的数据类型方便一些(类似java中的pojo)。
实现:
实现writableComparable接口即可。
场景例如:
成绩表:由语文,数学,英文组成。
上传到hdfs上score目录下一个score.txt文件--文件内容如下:
想让按照总成绩进行排名。如果成绩相同,则按照语文,数学,英文来排序。
一、自定义ScoreWritable实现writableComparable接口:
package com.day07; import org.apache.hadoop.io.WritableComparable; import java.io.*; public class ScoreWritable implements WritableComparable<ScoreWritable> { int chinese; int math; int english; int sum; public ScoreWritable() { } public ScoreWritable(int chinese, int math, int english) { this.chinese = chinese; this.math = math; this.english = english; this.sum=chinese+english+math; } @Override public String toString() { return "ScoreWritable{" + "chinese=" + chinese + ", math=" + math + ", english=" + english + ", sum=" + sum + '}'; } public int getChinese() { return chinese; } public void setChinese(int chinese) { this.chinese = chinese; } public int getMath() { return math; } public void setMath(int math) { this.math = math; } public int getEnglish() { return english; } public void setEnglish(int english) { this.english = english; } public int getSum() { return sum; } public void setSum(int sum) { this.sum = sum; } //比较 public int compareTo(ScoreWritable that) { //先比较总成绩 if (this.sum>that.getSum()){ return -1; }else if(this.sum<that.getSum()){ return 1; }else{ if (this.chinese>that.getChinese()){ return -1; }else if (this.chinese<that.getChinese()){ return 1; }else { return -(this.math-that.getMath()); } } } //序列化--dataOutput(data流):可以自定义序列化对象,节省空间,hadoop用的就是这个流 public void write(DataOutput out) throws IOException { out.writeInt(chinese); out.writeInt(math); out.writeInt(english); out.writeInt(sum); } //反序列化 public void readFields(DataInput in) throws IOException { this.chinese = in.readInt(); this.math = in.readInt(); this.english = in.readInt(); this.sum = in.readInt(); } }
注意:
最好实现toString方法。
二、编写ScoreJob类用于测试自定义的ScoreWritable
package com.day07; import com.day03.MaxSaleJob; import com.google.common.io.Resources; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class ScoreJob { public static class ScoreMapper extends Mapper<LongWritable,Text,ScoreWritable,NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //super.map(key, value, context); String[] grades = value.toString().split(","); ScoreWritable score = new ScoreWritable(Integer.parseInt(grades[0]), Integer.parseInt(grades[1]), Integer.parseInt(grades[2])); context.write(score,NullWritable.get()); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration coreSiteConf = new Configuration(); coreSiteConf.addResource(Resources.getResource("core-site-local.xml")); //设置一个任务 Job job = Job.getInstance(coreSiteConf, "score"); //设置job的运行类 job.setJarByClass(ScoreJob.class); //mrdemo/target/mrdemo-1.0-SNAPSHOT.jar //job.setJar("mrdemo/target/mrdemo-1.0-SNAPSHOT.jar"); //设置Map和Reduce处理类 job.setMapperClass(ScoreMapper.class); //map输出类型 job.setMapOutputKeyClass(ScoreWritable.class); job.setMapOutputValueClass(NullWritable.class); //设置job/reduce输出类型 /*job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);*/ //设置任务的输入路径 FileInputFormat.addInputPath(job, new Path("/score/")); FileSystem fileSystem = FileSystem.get(coreSiteConf); if(fileSystem.exists(new Path("/out/"))){ fileSystem.delete(new Path("/out/"),true); }; FileOutputFormat.setOutputPath(job, new Path("/out/")); //运行任务 boolean flag = job.waitForCompletion(true); if(flag){ FSDataInputStream open = fileSystem.open(new Path("/out/part-r-00000")); byte[] buffer = new byte[1024]; IOUtils.readFully(open,buffer,0,open.available()); System.out.println(new String(buffer)); } } }
三、测试结果,类似于一下内容