Hadoop--MapReduce2--自定义序列化数据类型
在MapReduce编程模型中,Map阶段以及Reduce阶段都需要对输入输出的数据进行序列化以及反序列化,因此输入输出的参数必须实现特定的接口Writable,具体来说Hadoop对基本数据类型已做封装如Text,LongWritable,IntWritable,FloatWritable,NullWritable等。
Writable接口定义如下:
@Public
@Stable
public interface Writable {
void write(DataOutput var1) throws IOException;
void readFields(DataInput var1) throws IOException;
}
自定义数据类型实现write以及readFields接口即可,write方法实现将自定义数据类型输出到DataOutput中,DataOutput提供的方法有:
各种字段类型使用相应方法输出即可,其中字符串类型要使用writeUTF方法输出,通过该方法输出字符串在二进制序列中会标识字符串的起始以及长度,这样便可以顺利反序列化,writeByte以及writeUTF测试如下:
可知writeUTF多了0,11两个字节分别为UTFString编码的起始标识符以及字符串的长度。
实现自定义的Writable接口要点:
1.相应字段使用正确的方法写入到DataOutput二进制序列中
2.write与read对于字段的写入读出顺序要一致
3.自定义Bean中要保留空参函数构造器
需求:统计以下文件中,每一个用户所耗费的总上行流量,总下行流量,总流量
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
map阶段读取每一行数据,切分数据,将手机号作为key,上行流量,下行流量以及总流量封装成Bean输出到Context中。
reduce阶段将相同手机号的数据聚合,相加得到某个手机号的总上行,总下行,总流量的使用情况。
自定义Bean封装数据
public class FlowBean implements Writable {
private int upFlow;
private int dFlow;
private String phone;
private int amountFlow;
public FlowBean(){}
public FlowBean(String phone, int upFlow, int dFlow) {
this.phone = phone;
this.upFlow = upFlow;
this.dFlow = dFlow;
this.amountFlow = upFlow + dFlow;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public int getUpFlow() {
return upFlow;
}
public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}
public int getdFlow() {
return dFlow;
}
public void setdFlow(int dFlow) {
this.dFlow = dFlow;
}
public int getAmountFlow() {
return amountFlow;
}
public void setAmountFlow(int amountFlow) {
this.amountFlow = amountFlow;
}
/**
* hadoop系统在序列化该类的对象时要调用的方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeUTF(phone);
out.writeInt(dFlow);
out.writeInt(amountFlow);
}
/**
* hadoop系统在反序列化该类的对象时要调用的方法
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readInt();
this.phone = in.readUTF();
this.dFlow = in.readInt();
this.amountFlow = in.readInt();
}
@Override
public String toString() {
return this.phone + ","+this.upFlow +","+ this.dFlow +"," + this.amountFlow;
}
}
实现Mapper以及Reducer接口
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phone = fields[1];
int upFlow = Integer.parseInt(fields[fields.length-3]);
int dFlow = Integer.parseInt(fields[fields.length-2]);
context.write(new Text(phone), new FlowBean(phone, upFlow, dFlow));
}
}
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
int upSum = 0;
int dSum = 0;
for(FlowBean value:values){
upSum += value.getUpFlow();
dSum += value.getdFlow();
}
context.write(key, new FlowBean(key.toString(), upSum, dSum));
}
}
提交任务到本地模拟器运行
public class JobSubmitter {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(JobSubmitter.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\flow\\input"));
FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\flow\\output"));
job.waitForCompletion(true);
}
}
运行日志:
[INFO ] 2019-02-27 20:24:44,682 method:org.apache.hadoop.conf.Configuration.warnOnceIfDeprecated(Configuration.java:1181)
session.id is deprecated. Instead, use dfs.metrics.session-id
[INFO ] 2019-02-27 20:24:44,689 method:org.apache.hadoop.metrics.jvm.JvmMetrics.init(JvmMetrics.java:79)
Initializing JVM Metrics with processName=JobTracker, sessionId=
[WARN ] 2019-02-27 20:24:46,541 method:org.apache.hadoop.mapreduce.JobResourceUploader.uploadFiles(JobResourceUploader.java:64)
Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
[WARN ] 2019-02-27 20:24:46,588 method:org.apache.hadoop.mapreduce.JobResourceUploader.uploadFiles(JobResourceUploader.java:171)
No job jar file set. User classes may not be found. See Job or Job#setJar(String).
[INFO ] 2019-02-27 20:24:46,924 method:org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:289)
Total input files to process : 1
[INFO ] 2019-02-27 20:24:47,083 method:org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:200)
number of splits:1
[INFO ] 2019-02-27 20:24:47,355 method:org.apache.hadoop.mapreduce.JobSubmitter.printTokens(JobSubmitter.java:289)
Submitting tokens for job: job_local861456068_0001
[INFO ] 2019-02-27 20:24:47,697 method:org.apache.hadoop.mapreduce.Job.submit(Job.java:1345)
The url to track the job: http://localhost:8080/
[INFO ] 2019-02-27 20:24:47,700 method:org.apache.hadoop.mapreduce.Job.monitorAndPrintJob(Job.java:1390)
Running job: job_local861456068_0001
[INFO ] 2019-02-27 20:24:47,702 method:org.apache.hadoop.mapred.LocalJobRunner$Job.createOutputCommitter(LocalJobRunner.java:498)
OutputCommitter set in config null
[INFO ] 2019-02-27 20:24:47,716 method:org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:123)
File Output Committer Algorithm version is 1
[INFO ] 2019-02-27 20:24:47,717 method:org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:138)
FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
[INFO ] 2019-02-27 20:24:47,718 method:org.apache.hadoop.mapred.LocalJobRunner$Job.createOutputCommitter(LocalJobRunner.java:516)
OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
[INFO ] 2019-02-27 20:24:47,814 method:org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:475)
Waiting for map tasks
[INFO ] 2019-02-27 20:24:47,816 method:org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:251)
Starting task: attempt_local861456068_0001_m_000000_0
[INFO ] 2019-02-27 20:24:47,881 method:org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:123)
File Output Committer Algorithm version is 1
[INFO ] 2019-02-27 20:24:47,884 method:org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:138)
FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
[INFO ] 2019-02-27 20:24:47,905 method:org.apache.hadoop.yarn.util.ProcfsBasedProcessTree.isAvailable(ProcfsBasedProcessTree.java:168)
ProcfsBasedProcessTree currently is supported only on Linux.
[INFO ] 2019-02-27 20:24:48,057 method:org.apache.hadoop.mapred.Task.initialize(Task.java:619)
Using ResourceCalculatorProcessTree : [email protected]
[INFO ] 2019-02-27 20:24:48,073 method:org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
Processing split: file:/F:/hadoop-2.8.1/data/flow/input/flow.log:0+2226
[INFO ] 2019-02-27 20:24:48,238 method:org.apache.hadoop.mapred.MapTask$MapOutputBuffer.setEquator(MapTask.java:1205)
(EQUATOR) 0 kvi 26214396(104857584)
[INFO ] 2019-02-27 20:24:48,238 method:org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:998)
mapreduce.task.io.sort.mb: 100
[INFO ] 2019-02-27 20:24:48,239 method:org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:999)
soft limit at 83886080
[INFO ] 2019-02-27 20:24:48,239 method:org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1000)
bufstart = 0; bufvoid = 104857600
[INFO ] 2019-02-27 20:24:48,239 method:org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1001)
kvstart = 26214396; length = 6553600
[INFO ] 2019-02-27 20:24:48,244 method:org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:403)
Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
[INFO ] 2019-02-27 20:24:48,263 method:org.apache.hadoop.mapred.LocalJobRunner$Job.statusUpdate(LocalJobRunner.java:618)
[INFO ] 2019-02-27 20:24:48,263 method:org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1462)
Starting flush of map output
[INFO ] 2019-02-27 20:24:48,264 method:org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1484)
Spilling map output
[INFO ] 2019-02-27 20:24:48,264 method:org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1485)
bufstart = 0; bufend = 808; bufvoid = 104857600
[INFO ] 2019-02-27 20:24:48,264 method:org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1487)
kvstart = 26214396(104857584); kvend = 26214312(104857248); length = 85/6553600
[INFO ] 2019-02-27 20:24:48,308 method:org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1669)
Finished spill 0
[INFO ] 2019-02-27 20:24:48,322 method:org.apache.hadoop.mapred.Task.done(Task.java:1099)
Task:attempt_local861456068_0001_m_000000_0 is done. And is in the process of committing
[INFO ] 2019-02-27 20:24:48,361 method:org.apache.hadoop.mapred.LocalJobRunner$Job.statusUpdate(LocalJobRunner.java:618)
map
[INFO ] 2019-02-27 20:24:48,361 method:org.apache.hadoop.mapred.Task.sendDone(Task.java:1219)
Task 'attempt_local861456068_0001_m_000000_0' done.
[INFO ] 2019-02-27 20:24:48,361 method:org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:276)
Finishing task: attempt_local861456068_0001_m_000000_0
[INFO ] 2019-02-27 20:24:48,361 method:org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:483)
map task executor complete.
[INFO ] 2019-02-27 20:24:48,367 method:org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:475)
Waiting for reduce tasks
[INFO ] 2019-02-27 20:24:48,367 method:org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:329)
Starting task: attempt_local861456068_0001_r_000000_0
[INFO ] 2019-02-27 20:24:48,381 method:org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:123)
File Output Committer Algorithm version is 1
[INFO ] 2019-02-27 20:24:48,382 method:org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:138)
FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
[INFO ] 2019-02-27 20:24:48,383 method:org.apache.hadoop.yarn.util.ProcfsBasedProcessTree.isAvailable(ProcfsBasedProcessTree.java:168)
ProcfsBasedProcessTree currently is supported only on Linux.
[INFO ] 2019-02-27 20:24:48,541 method:org.apache.hadoop.mapred.Task.initialize(Task.java:619)
Using ResourceCalculatorProcessTree : [email protected]
[INFO ] 2019-02-27 20:24:48,549 method:org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:362)
Using ShuffleConsumerPlugin: [email protected]
[INFO ] 2019-02-27 20:24:48,577 method:org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.<init>(MergeManagerImpl.java:206)
MergerManager: memoryLimit=1323407744, maxSingleShuffleLimit=330851936, mergeThreshold=873449152, ioSortFactor=10, memToMemMergeOutputsThreshold=10
[INFO ] 2019-02-27 20:24:48,582 method:org.apache.hadoop.mapreduce.task.reduce.EventFetcher.run(EventFetcher.java:61)
attempt_local861456068_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
[INFO ] 2019-02-27 20:24:48,668 method:org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.copyMapOutput(LocalFetcher.java:145)
localfetcher#1 about to shuffle output of map attempt_local861456068_0001_m_000000_0 decomp: 854 len: 858 to MEMORY
[INFO ] 2019-02-27 20:24:48,694 method:org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.doShuffle(InMemoryMapOutput.java:93)
Read 854 bytes from map-output for attempt_local861456068_0001_m_000000_0
[INFO ] 2019-02-27 20:24:48,698 method:org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.closeInMemoryFile(MergeManagerImpl.java:321)
closeInMemoryFile -> map-output of size: 854, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->854
[INFO ] 2019-02-27 20:24:48,701 method:org.apache.hadoop.mapreduce.task.reduce.EventFetcher.run(EventFetcher.java:76)
EventFetcher is interrupted.. Returning
[INFO ] 2019-02-27 20:24:48,703 method:org.apache.hadoop.mapred.LocalJobRunner$Job.statusUpdate(LocalJobRunner.java:618)
1 / 1 copied.
[INFO ] 2019-02-27 20:24:48,703 method:org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.finalMerge(MergeManagerImpl.java:693)
finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
[INFO ] 2019-02-27 20:24:48,706 method:org.apache.hadoop.mapreduce.Job.monitorAndPrintJob(Job.java:1411)
Job job_local861456068_0001 running in uber mode : false
[INFO ] 2019-02-27 20:24:48,710 method:org.apache.hadoop.mapreduce.Job.monitorAndPrintJob(Job.java:1418)
map 100% reduce 0%
[INFO ] 2019-02-27 20:24:48,736 method:org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:606)
Merging 1 sorted segments
[INFO ] 2019-02-27 20:24:48,738 method:org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:705)
Down to the last merge-pass, with 1 segments left of total size: 840 bytes
[INFO ] 2019-02-27 20:24:48,742 method:org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.finalMerge(MergeManagerImpl.java:760)
Merged 1 segments, 854 bytes to disk to satisfy reduce memory limit
[INFO ] 2019-02-27 20:24:48,747 method:org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.finalMerge(MergeManagerImpl.java:790)
Merging 1 files, 858 bytes from disk
[INFO ] 2019-02-27 20:24:48,747 method:org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.finalMerge(MergeManagerImpl.java:805)
Merging 0 segments, 0 bytes from memory into reduce
[INFO ] 2019-02-27 20:24:48,747 method:org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:606)
Merging 1 sorted segments
[INFO ] 2019-02-27 20:24:48,750 method:org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:705)
Down to the last merge-pass, with 1 segments left of total size: 840 bytes
[INFO ] 2019-02-27 20:24:48,751 method:org.apache.hadoop.mapred.LocalJobRunner$Job.statusUpdate(LocalJobRunner.java:618)
1 / 1 copied.
[INFO ] 2019-02-27 20:24:48,763 method:org.apache.hadoop.conf.Configuration.warnOnceIfDeprecated(Configuration.java:1181)
mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
[INFO ] 2019-02-27 20:24:48,779 method:org.apache.hadoop.mapred.Task.done(Task.java:1099)
Task:attempt_local861456068_0001_r_000000_0 is done. And is in the process of committing
[INFO ] 2019-02-27 20:24:48,783 method:org.apache.hadoop.mapred.LocalJobRunner$Job.statusUpdate(LocalJobRunner.java:618)
1 / 1 copied.
[INFO ] 2019-02-27 20:24:48,785 method:org.apache.hadoop.mapred.Task.commit(Task.java:1260)
Task attempt_local861456068_0001_r_000000_0 is allowed to commit now
[INFO ] 2019-02-27 20:24:48,794 method:org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:582)
Saved output of task 'attempt_local861456068_0001_r_000000_0' to file:/F:/hadoop-2.8.1/data/flow/output/_temporary/0/task_local861456068_0001_r_000000
[INFO ] 2019-02-27 20:24:48,796 method:org.apache.hadoop.mapred.LocalJobRunner$Job.statusUpdate(LocalJobRunner.java:618)
reduce > reduce
[INFO ] 2019-02-27 20:24:48,797 method:org.apache.hadoop.mapred.Task.sendDone(Task.java:1219)
Task 'attempt_local861456068_0001_r_000000_0' done.
[INFO ] 2019-02-27 20:24:48,798 method:org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:352)
Finishing task: attempt_local861456068_0001_r_000000_0
[INFO ] 2019-02-27 20:24:48,798 method:org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:483)
reduce task executor complete.
[INFO ] 2019-02-27 20:24:49,711 method:org.apache.hadoop.mapreduce.Job.monitorAndPrintJob(Job.java:1418)
map 100% reduce 100%
[INFO ] 2019-02-27 20:24:49,712 method:org.apache.hadoop.mapreduce.Job.monitorAndPrintJob(Job.java:1429)
Job job_local861456068_0001 completed successfully
[INFO ] 2019-02-27 20:24:49,732 method:org.apache.hadoop.mapreduce.Job.monitorAndPrintJob(Job.java:1436)
Counters: 30
File System Counters
FILE: Number of bytes read=6534
FILE: Number of bytes written=640226
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=22
Map output records=22
Map output bytes=808
Map output materialized bytes=858
Input split bytes=111
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=858
Reduce input records=22
Reduce output records=21
Spilled Records=44
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=11
Total committed heap usage (bytes)=468713472
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2226
File Output Format Counters
Bytes Written=816