11.mr案例:流量分析
1.数据
2.需求:
统计总流量:上行流量+下限流量
3.Mapper
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//1.获取数据
String line = value.toString();
//2.切割
String[] fields = line.split("\t");
//3.封装对象 拿到关键字段 数据清洗
String phoneN = fields[1];
long upFlow = Long.parseLong(fields[fields.length - 3]);
long dfFlow = Long.parseLong(fields[fields.length - 2]);
//4.输出到reducer端13726130503 299 681 980
context.write(new Text(phoneN), new FlowBean(upFlow, dfFlow));
}
}
4.封装的数据类型
public class FlowBean implements Writable{
//定义属性
private long upFlow;
private long dfFlow;
private long flowSum;
public FlowBean() {}
//流量累加
public FlowBean(long upFlow, long dfFlow) {
this.upFlow = upFlow;
this.dfFlow = dfFlow;
this.flowSum = upFlow + dfFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDfFlow() {
return dfFlow;
}
public void setDfFlow(long dfFlow) {
this.dfFlow = dfFlow;
}
public long getFlowsum() {
return flowSum;
}
public void setFlowsum(long flowsum) {
this.flowSum = flowsum;
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dfFlow = in.readLong();
flowSum = in.readLong();
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowSum);
}
@Override
public String toString() {
return upFlow + "\t" + dfFlow + "\t" + flowSum;
}
}
5.Reducer
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
//1.相同手机号 的流量使用再次汇总
long upFlow_sum = 0;
long dfFlow_sum = 0;
//2.累加
for(FlowBean f:values) {
upFlow_sum += f.getUpFlow();
dfFlow_sum += f.getDfFlow();
}
FlowBean rs = new FlowBean(upFlow_sum, dfFlow_sum);
//3.输出
context.write(key, rs);
}
}
6.Drive
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.获取jar包
job.setJarByClass(FlowCountDriver.class);
// 3.获取自定义的mapper与reducer类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 4.设置map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5.设置reduce输出的数据类型(最终的数据类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6.设置输入存在的路径与处理后的结果路径
FileInputFormat.setInputPaths(job, new Path("c:/flow1020/in"));
FileOutputFormat.setOutputPath(job, new Path("c:/flow1020/out"));
// 7.提交任务
boolean rs = job.waitForCompletion(true);
System.out.println(rs ? 0 : 1);
}
}