11.mr案例:流量分析

1.数据

11.mr案例:流量分析

2.需求:

统计总流量:上行流量+下限流量

3.Mapper

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		//1.获取数据
		String line = value.toString();
		
		//2.切割
		String[] fields = line.split("\t");
		
		//3.封装对象 拿到关键字段 数据清洗
		String phoneN = fields[1];
		long upFlow = Long.parseLong(fields[fields.length - 3]);
		long dfFlow = Long.parseLong(fields[fields.length - 2]);
		
		//4.输出到reducer端13726130503  299	681  980
		context.write(new Text(phoneN), new FlowBean(upFlow, dfFlow));
	}
}

4.封装的数据类型

public class FlowBean implements Writable{
	
	//定义属性
	private long upFlow;
	private long dfFlow;
	private long flowSum;
	
	
	public FlowBean() {}
	
	//流量累加
	public FlowBean(long upFlow, long dfFlow) {
		this.upFlow = upFlow;
		this.dfFlow = dfFlow;
		this.flowSum = upFlow + dfFlow;
	}
	

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDfFlow() {
		return dfFlow;
	}

	public void setDfFlow(long dfFlow) {
		this.dfFlow = dfFlow;
	}

	public long getFlowsum() {
		return flowSum;
	}

	public void setFlowsum(long flowsum) {
		this.flowSum = flowsum;
	}

	//反序列化
	@Override
	public void readFields(DataInput in) throws IOException {
		upFlow = in.readLong();
		dfFlow = in.readLong();
		flowSum = in.readLong();
	}

	//序列化
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(dfFlow);
		out.writeLong(flowSum);
		
	}

	@Override
	public String toString() {
		return upFlow + "\t" + dfFlow + "\t" + flowSum;
	}
	
	
	
}

5.Reducer

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
	
	@Override
	protected void reduce(Text key, Iterable<FlowBean> values, Context context)
			throws IOException, InterruptedException {
		//1.相同手机号 的流量使用再次汇总
		long upFlow_sum = 0;
		long dfFlow_sum = 0;
		
		//2.累加
		for(FlowBean f:values) {
			upFlow_sum += f.getUpFlow();
			dfFlow_sum += f.getDfFlow();
		}
		
		FlowBean rs = new FlowBean(upFlow_sum, dfFlow_sum);
		
		//3.输出
		context.write(key, rs);
	}

}

6.Drive

public class FlowCountDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// 1.获取job信息
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		// 2.获取jar包
		job.setJarByClass(FlowCountDriver.class);

		// 3.获取自定义的mapper与reducer类
		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);

		// 4.设置map输出的数据类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);

		// 5.设置reduce输出的数据类型(最终的数据类型)
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 6.设置输入存在的路径与处理后的结果路径
		FileInputFormat.setInputPaths(job, new Path("c:/flow1020/in"));
		FileOutputFormat.setOutputPath(job, new Path("c:/flow1020/out"));

		// 7.提交任务
		boolean rs = job.waitForCompletion(true);
		System.out.println(rs ? 0 : 1);
	}
}