Hadoop生态圈(三):MapReduce组件
目录
2.13 为什么不使用java 的序列化Serializable
2.3 自定义bean对象实现序列化接口(Writable)
3.2.3 CombineTextInputFormat案例
1 MapReduce入门
1.1 MapReduce定义
Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架。
Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。
1.2 MapReduce的优缺点
1. 优点
1.MapReduce 易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。就是因为这个特点使得MapReduce编程变得非常流行。
2.良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
3.高容错性
MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
4.适合PB级以上海量数据的离线处理
它适合离线处理而不适合在线处理。比如像毫秒级别的返回一个结果,MapReduce很难做到。
2. 缺点
MapReduce不擅长做实时计算、流式计算、DAG(有向图)计算。
1. 实时计算
MapReduce无法像Mysql一样,在毫秒或者秒级内返回结果。
2. 流式计算
流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。
3. DAG(有向图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
1.3 MapReduce核心思想
- 分布式的运算程序往往至少需要分为两个阶段
- 第一阶段的maptask并发实例,完全并行运行,互不相干
- 第二阶段的reudcetask并发实例互不相干,但是他们的数据依赖于上一个阶段所有maptask并发实例的输出
- MapReudce编程模型只能包含一个map阶段和reduce阶段,如果业务逻辑特别复杂,那就只能多个mapreduce程序串行执行
1.4 MapReduce进程(MR)
一个完整的mapreduce程序在分布式运行时有三类实例进程:
-
MrAppMaster:负责整个程序的过程调度及状态协调。
-
MapTask:负责map阶段的整个数据处理流程。
-
ReduceTask:负责reduce阶段的整个数据处理流程。
1.5 MapReduce编程规范
用户编写的程序分为三个部分:Mapper、Reducer和Driver
- Map阶段:
- 用户自定义的Mapper要继承自己的父类
- Mapper的输入数据时KV对的形式(KV的数据类型可自定义)
- Mapper中的业务逻辑写在map()方法中
- Mapper的输出数据是KV对的形式(KV的数据类型可自定义)
- map()方法(maptask进程)对每一个<K,V>调用一次
- Reduce阶段:
- 用户自定义的Reducer要继承自己的父类
- Reducer的输入数据类型要对应Mapper的输出数据类型,也是KV格式的
- Reducer的业务逻辑写在reduce()方法中
- reducetask进程对每一组相同K的<K,V>组调用一次reduce方法
- Driver阶段(关联Mapper和Reducer,并且提交任务到集群)
相当于yarn集群的客户端,用于提交我们整个程序到yarn集群,提交的是封装了mapreduce程序相关运行参数的job对象
1.6 WordCount案例
1. 需求:dui下面给定的数中统计每一个单词出现的总次数
2. 需求分析:按照mapreduce规范,分别编写Mapper、Reducer、Driver
3. 准备工作
导入下面的依赖,配置文件
<dependencies> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> </dependencies>
配置文件:
log4j.rootLogger=debug, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
4. 编写程序
1. Mapper类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.获取一行的数据
String line = value.toString();
//2.切割
String[] strings = line.split(" ");
//3.输出
for (String string : strings) {
k.set(string);
context.write(k,v);
}
}
}
2.Reducer类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> value,
Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : value) {
sum += count.get();
}
// 2 输出
v.set(sum);
context.write(key,v);
}
}
3.Driver类(注意导入的包是否正确)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取配置信息及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//2.设置jar加载路径
job.setJarByClass(WordCountDriver.class);
//3.设置map和reduce类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordcountReducer.class);
//4.设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置reduce输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.设置输入输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交
job.waitForCompletion(true);
}
}
4. 本地测试
需要在window上配置HADOOP_HPME的环境变量,然后再IDEA上运行
5. 集群测试
- 将程序打jar包,上传到hadoop集群
- 启动hadoop集群,运行wordcount程序
[[email protected] software]$ hadoop jar wordcount.jar com.bigdata.wordcount.WordcountDriver /user/hadoop/input /user/bigdata/output1 //上传的jar的名称,驱动类的包名+类名,输入路径,输出路径
2 Hadoop序列化
2.1 序列化概述
2.1.1 什么是序列化
序列话就是将内存中的对象,转换成字节序列(或者其他的传输协议)以便于存储(持久化)和网络传输
反序列化就是将收到的字节序列(或其他的传输协议)或者磁盘上持久化的数据。转换为内存中的对象
2.1.2 为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
2.13 为什么不使用java 的序列化Serializable
java的序列化是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验,herder,继承体系等),不便于在网络中高效的传输。所以hadoop自己开发了一套序列化机制(Writable),有以下特点:
紧凑:紧凑的格式能让我们充分利用网络带宽,而网络带宽是数据中心最稀缺的资源
快速:进程通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销
互操作:能支持不同语言写的客户端和服务端进行交互
2.2 常用的数据序列化类型
常用的数据类型对应的hadoop数据序列化类型
Java类型 |
Hadoop Writable类型 |
boolean |
BooleanWritable |
byte |
ByteWritable |
int |
IntWritable |
float |
FloatWritable |
long |
LongWritable |
double |
DoubleWritable |
String |
Text |
map |
MapWritable |
array |
ArrayWritable |
2.3 自定义bean对象实现序列化接口(Writable)
自定义bean对象要想序列化传输,必须实现序列化接口,必须注意以下事项:
- 必须实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
- 重写序列化方法
- 重写反序列化方法
- 注意反序列化的顺序要和序列化的顺序一致
- 要想把结果显示在文件中,需要重写toString()方法,可用 “\t” 分开,方便后续调用
2.4 序列化案例
1. 需求: 统计每一个手机号耗费的总上行流量、下行流量、总流量
输入数据格式: 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 手机号码 上行流量 下行流量 |
输出数据格式 1356·0436666 1116 954 2070 手机号码 总上行流量 总下行流量 总流量 |
数据: 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 |
2. 分析
Map阶段:
(1)读取一行数据,切分字段
(2)抽取手机号、上行流量、下行流量
(3)以手机号为key,bean对象为value输出,即context.write(手机号,bean);
Reduce阶段:
(1)累加上行流量和下行流量得到总流量。
(2)实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输
(3) MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key
3. 编写程序
流量统计的bean对象
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
// 1 实现writable接口
public class FlowBean implements WritableComparable{
private long upFlow ;
private long downFlow;
private long sumFlow;
//2 反序列化时,需要反射调用空参构造函数,所以必须有
public FlowBean() {
super();
}
public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
//3 写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
//4 反序列化方法
//5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
// 6 编写toString方法,方便后续打印到文本
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
}
Mapper类
import java.io.IOException;
import com.bigdata.mapreduce.flow.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
FlowBean v = new FlowBean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割字段
String[] fields = line.split("\t");
// 3 封装对象
// 取出手机号码
String phoneNum = fields[1];
// 取出上行流量和下行流量
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
v.setUpFlow(upFlow);
v.setDownFlow(downFlow);
k.set(phoneNum);
// 4 写出
context.write(k,v);
}
}
Reducer类
import java.io.IOException;
import com.bigdata.mapreduce.flow.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_downFlow = 0;
// 1 遍历所用bean,将其中的上行流量,下行流量分别累加
for (FlowBean flowBean : values) {
sum_upFlow += flowBean.getUpFlow();
sum_downFlow += flowBean.getDownFlow();
}
// 2 封装对象
FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
// 3 写出
context.write(key, resultBean);
}
}
Driver类
import java.io.IOException;
import com.bigdata.mapreduce.flow.FlowBean;
import com.bigdata.mapreduce.flow.FlowCountMapper;
import com.bigdata.mapreduce.flow.FlowCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowsumDriver {
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowsumDriver.class);
// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
}
}
3 MapReduce框架原理
3.1 MapReduce工作流程
流程:
上面的流程是整个mapreduce整个工作流程,shuffle过程需要详细介绍,下面是具体的shuffle过程:
- maptask手机我们的map()方法输出的kv对,放到环形内存缓冲区;
- 内存中的容量达到一定的阈值,不断的溢写到本地磁盘,可能会溢写出多个文件
- 多个小文件会被合并为大的溢出文件
- 在溢出过程及合并的工程中,都要调用partitione进行分区和针对key进行排序
- reducetask根据自己的分区号,去各个maptask进程节点上获取相应的分区数据
- reducetask获取到多个maptask结果文件,将这些文件再次进行合并(归并排序)
- 合并成一个大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程
注意:
shuffle中的环形缓冲区的大小会影响到MR程序的执行效率,原则上说,缓冲区越大,进行磁盘IO的次数越少,执行速度就越快
缓冲区的大小可以通过参数调整,参数:io.sort.mb 默认100M
3.2 InputFormat数据输入
3.2.1 FileInputFormat操作流程
- 找到数据所在目录;
- 开始遍历处理(规划切片)目录下的每一个文件
- 遍历第一个文件xx.txt(假设300M)
- 获取文件的大小fs.sizeOf(xx.txt)
- 默认切片大小=blocksize(128M)
- 开始切片,形成第1个切片信息:xx.txt-0~128M 第2个切片信息:xx.txt-128M~256M 第3个切片信息:xx.txt-256M~300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片,比如说剩下部分大于128m但是小于140m(大概是这个区间))
- 将切片信息写到一个切片规划文件中
- 数据切片只是逻辑上对输入数据进行分片,并不会在磁盘上将文件切分成分片文件进行存储。使用InputSplit只记录了分片的元数据信息,比如某一个切片文件的起始位置、长度以及所在节点等
- block是HDFS物理上存储的数据,切片是对数据逻辑上的划分
- 提交切片规划到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask的个数
3.2.2 FileInputFormat切片机制
1. FileInputFormat中默认的切片机制(底层使用textInputFormat)
- 简单的按照文件的内容长度进行切片
- 切片大小默认等于block大小
- 切片时不考虑数据集整体,逐个针对每一个文件单独切片
比如待处理有两个文件
file1.txt 320M
file2.txt 10M
经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
2. CombineTextInputFormat切片机制
针对大量小文件的优化策略
默认情况下TextIuptFormat对任务的切片机制时候按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个maptask,这样如果有大量小文件,就会产生大量的maptask,处理效率及其低下
优化策略
- 最好的办法,在数据处理系统的最前端(预处理),将小文件先合并成大文件,在上传HDFS做后续分析;
- 补救措施:如果已经是大量的小文件在HDFS上了,可以使用另一种CombineTextInputFormat来做切片,它的切片逻辑可以将多个小文件从逻辑上规划为一个切片中,这样多个小文件就交给一个maptask进行处理
- 优先满足最小切片大小,不超过最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152); // 2m
举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m 这样最后就是三个分区
具体实现(需要首先在Driver中进行注册)
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
3.2.3 CombineTextInputFormat案例
1. 需求:将输入的大量小文件合并成成以一个切片统一处理
2. 输入数据:准备五个小文件
3. 实现过程
未作任何处理,在最初的wordcount程序中,观察切片个数为5
在WordCountDriver中增加下面的代码,运行程序,观察切片信息
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
3.3 MapTask工作机制
3.3.1 并行度决定机制
1. 问题引出
maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度。那么maptask的并行任务是否越多越好呢?
2. MapTask并行度决定机制
一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定
下面两个图解释了为什么分片要和block块的大小一致,切片大小跟hdfs存储block大小不一致会导致,数据传输的问题,在大数据中,宁可移动计算,也不要移动数据
3.3.2 MapTask工作机制
(1)Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用context.write,context.write底层 OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并等操作。
(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
3.4 Shuffle机制
3.4.1 shuffle机制
Mapreduce确保每个reducer的输入都是按key排序的。系统执行排序的过程(即将mapper输出作为输入传给reducer)称为shuffle
3.4.2 Partition分区
分区:把数据扎堆存放
问题引出:要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
1. 默认partition分区 hello-->hash%reducetask数量
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
默认分区是根据key的hashcode对reducetask的个数取模得到的,用户无法控制那个key存储到哪个分区
2. 自定义partition步骤
1. 自定义类继承Partitioner,重写getPartition()方法
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.junit.Test;
public class ProvincePartition extends Partitioner<Text,FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int i) {
String preNum = text.toString().substring(0,3);
int partition = 4;
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
2. 在job驱动类中,注册自定义分区类
job.setPartitionerClass(CustomPartitioner.class);
3. 自定义partition后,根据自定义partition的逻辑设置相应数量的reducetask
job.setNumReduceTasks(5);
3. 注意
reduceTask的个数决定了有几个文件!!
如果reduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
如果1< reduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception;
如果reduceTask的数量 = 1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;
例如:假设自定义分区数为5,则
(1)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
(2)job.setNumReduceTasks(2);会报错
(3)job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件
3.4.3 partition分区案例
1. 需求:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
2. 数据准备:使用流量统计案例中的数据
3. 分析
(1)Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。默认的分发规则为:根据key的hashcode%reducetask数来分发
(2)如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner,自定义一个CustomPartitioner继承抽象类:Partitioner
(3)在job驱动中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class)
4. 在流量统计案例基础上,增加一个自定义分区类
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.junit.Test;
public class ProvincePartition extends Partitioner<Text,FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int i) {
String preNum = text.toString().substring(0,3);
int partition = 4;
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
5. 在驱动类中将自定义的分区类注册并设置reducetask的数量,在Driver类基础上增加下面内容
// 8 将自定义数据分区注册
job.setPartitionerClass(ProvincePartitioner.class);
// 9 设置相应数量的reduce task
job.setNumReduceTasks(5);
3.4.4 WritableComparable排序
排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均会对数据(按照key)进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序。
对于Map Task,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件。
对于Reduce Task,它从每个Map Task上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,Reduce Task统一对内存和磁盘上的所有数据进行一次合并。
每个阶段的默认排序
1. 排序的分类
(1)部分排序:
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。
(2)全排序:
如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。
替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。
2. 自定义排序WritableComparable
bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
if(this.sumFlow==o.getSumFlow()){
This.downFlow>o.getDownFlow() ? -1 :1
}
}
3.4.5 WritableComparable排序案例
案例一
1. 需求: 根据流量统计的结果再次对总流量进行排序
2. 代码实现
(1)在FlowBean基础上增加了比较功能,在原先代码基础上增加该方法即可
@Override
public int compareTo(Object o) {
FlowBean f = (FlowBean)o;
return this.sumFlow > f.getSumFlow()? -1 : 1;
}
(2)mapper类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
FlowBean flowBean = new FlowBean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割字段
String[] fields = line.split("\t");
// 3 封装对象
// 取出手机号码
String phoneNum = fields[0];
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
long sumFlow = Long.parseLong(fields[3]);
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
flowBean.setSumFlow(sumFlow);
k.set(phoneNum);
// 4 写出
context.write(flowBean, k);
}
}
(3)reducer类
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 1 遍历所用bean,将其中的上行流量,下行流量分别累加
for (Text value : values) {
context.write(value,key);
}
}
}
(4)Driver类
import java.io.IOException;
import com.bigdata.mapreduce.flow.FlowBean;
import com.bigdata.mapreduce.flow.FlowCountMapper;
import com.bigdata.mapreduce.flow.FlowCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowsumDriver {
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowsumDriver.class);
// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
}
}
案例二
1. 需求:要求每个省份手机号输出的文件中按照总流量内部排序。
2. 分析:基于前一个需求,增加自定义分区类即可。
(1)增加自定义分区类
package com.bigdata.mapreduce.sort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean key, Text value, int numPartitions) {
// 1 获取手机号码前三位
String preNum = value.toString().substring(0, 3);
int partition = 4;
// 2 根据手机号归属地设置分区
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
(2)在驱动类中注册分区类
// 加载自定义分区类
job.setPartitionerClass(FlowSortPartitioner.class);
// 设置Reducetask个数
job.setNumReduceTasks(5);
3.4.6 Combiner合并
比如:<b,1> <b,1> == <b,2>
-
combiner是MR程序中Mapper和Reducer之外的一种组件。
-
combiner组件的父类就是Reducer。
-
combiner和reducer的区别在于运行的位置:
-
Combiner是在每一个maptask所在的节点运行;
-
Reducer是接收全局所有Mapper的输出结果;
-
-
combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
-
combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来
-
自定义Combiner实现步骤
(1)自定一个combiner继承Reducer,重写reduce方法
package com.bigdata.mr.combiner;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// 1 汇总
int count = 0;
for(IntWritable v :values){
count += v.get();
}
// 2 写出
context.write(key, new IntWritable(count));
}
}
(2)在驱动类中指定combiner
// 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordcountCombiner.class);
运行程序,如图所示
3.5 ReduceTask工作机制
1.设置ReduceTask并行度(个数)
reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:
//默认值是1,手动设置为4
job.setNumReduceTasks(4);
2.注意
- reducetask=0 ,表示没有reduce阶段,输出文件个数和map个数一致。
- reducetask默认值就是1,所以输出文件个数为一个。
- 如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜
- reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask。
- 具体多少个reducetask,需要根据集群性能而定。
- 如果分区数不是1,但是reducetask为1,是否执行分区过程。答案是:不执行分区过程。因为在maptask的源码中,执行分区的前提是先判断reduceNum个数是否大于1。不大于1肯定不执行。
4.ReduceTask工作机制
- (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
- (2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
- (3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
- (4)Reduce阶段:reduce()函数将计算结果写到HDFS上。
3.6 MapReduce Join(关联)
3.6.1 Reduce Join
1. 原理
Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行合并就ok了。
3.6.2 Reduce join案例
1. 需求:将商品信息表中数据根据商品pid合并到订单数据表中。
现在我们通过MapReduce的方式实现,通过关联条件作为map输出的key,将两个表满足join条件的数据(包含数据来源于哪一个文件的标识),发往统一个reduce task,在reduce中进行数据的串联
1. 创建商品和订单合并后的bean类
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TableBean implements WritableComparable<TableBean> {
private String order_id; // 订单id
private String pid; // 商品id
private int account; // 商品数量
private String pname; // 商品名称
private String flag; // 标记位,标记该bean来自于哪里,0代表订单,1代表商品
public TableBean() {
}
public TableBean(String order_id, String pid, int account, String pname, String flag) {
this.order_id = order_id;
this.pid = pid;
this.account = account;
this.pname = pname;
this.flag = flag;
}
public String getOrder_id() {
return order_id;
}
public void setOrder_id(String order_id) {
this.order_id = order_id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAccount() {
return account;
}
public void setAccount(int account) {
this.account = account;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public String toString() {
return order_id + '\t' + pname + '\t'+ account ;
}
// 序列化:写字符串使用writeUTF
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(order_id);
out.writeUTF(pid);
out.writeInt(account);
out.writeUTF(pname);
out.writeUTF(flag);
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
order_id = in.readUTF();
pid = in.readUTF();
account = in.readInt();
pname = in.readUTF();
flag = in.readUTF();
}
@Override
public int compareTo(TableBean o) {
return 1;
}
}
2. Mapper类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
TableBean bean = new TableBean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.获取文件输入类型
FileSplit split = (FileSplit) context.getInputSplit();
String name = split.getPath().getName();
//2.获取输入数据
String line = value.toString();
//3.不同文件分别处理
if (name.startsWith("order")) { //订单表处理
// 切割
String[] strings = line.split("\t");
//封装对象
bean.setOrder_id(strings[0]);
bean.setPid(strings[1]);
bean.setAccount(Integer.parseInt(strings[2]));
bean.setPname("");
bean.setFlag("0");
k.set(strings[1]);
} else { //商品表处理
// 切割
String[] strings = line.split("\t");
//封装
bean.setPid(strings[0]);
bean.setPname(strings[1]);
bean.setFlag("1");
bean.setAccount(0);
bean.setOrder_id("");
k.set(strings[0]);
}
context.write(k, bean);
}
}
3. Reducer类
import javafx.scene.control.Tab;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.codehaus.jackson.map.util.BeanUtil;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
//1.准备存储订单的集合
ArrayList<TableBean> ordersBean = new ArrayList<>();
//2.准备bean对象
TableBean pbBean = new TableBean();
for (TableBean value : values) {
if ("0".equals(value.getFlag())) {
// 拷贝传递过来的每条订单数据到集合中
TableBean orderBean = new TableBean();
try {
BeanUtils.copyProperties(orderBean,value);
} catch (Exception e) {
e.printStackTrace();
}
ordersBean.add(orderBean);
} else {
try {
BeanUtils.copyProperties(pbBean,value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
//3.表的拼接
for (TableBean tableBean : ordersBean) {
tableBean.setPname(pbBean.getPname());
//4.将数据写出去
context.write(tableBean, NullWritable.get());
}
}
}
4. Driver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TableDriver {
public static void main(String[] args) throws Exception {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定本程序的jar包所在的本地路径
job.setJarByClass(TableDriver.class);
// 3 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
// 4 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
}
}
5. 运行查看结果
1001 小米 1 1001 小米 1 1002 华为 2 1002 华为 2 1003 格力 3 1003 格力 3 |
缺点:这种方式中,合并操作是在reduce阶段完成,reduce端的压力太大,map节点的运算的负载则很低,资源率不高,并且在reduce阶段内极易产生数据倾斜(某个reduce接收到的数据量特别大)
我们可以采用在map端进行数据合并来解决这个问题
3.6.3 Map join
适用于的场景:一张表特别大,而另一张表很小
在这种情况下,在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端数据的压力,尽可能的减少数据倾斜
1. 具体实现
可以采用distributedcache,将小表提前加载到缓存集合中,mapper在setup的时候将小表架子啊到本地内存,在本地对地自己读到的大表数据进程业务逻辑合并并输出结果,可以大大提高合并操作二点并发度,加快处理速度
对于这个案例,在map端进行join操作后就不需要reduce阶段了,直接设置reducetask 的数量为0即可
1. Driver类: 现在驱动类中添加缓存文件(第6)
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DistributedCacheDriver {
public static void main(String[] args) throws Exception {
// 1 获取job信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置加载jar包路径
job.setJarByClass(DistributedCacheDriver.class);
// 3 关联map
job.setMapperClass(DistributedCacheMapper.class);
// 4 设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 加载缓存数据
job.addCacheFile(new URI("file:///e:/inputcache/pd.txt"));
// 7 map端join的逻辑不需要reduce阶段,设置reducetask数量为0
job.setNumReduceTasks(0);
// 8 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
2. Mapper类:读取缓存集合中的数据
import com.bigdata.mapreduce.table.TableBean;
import org.apache.commons.io.input.BOMInputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
public class MapJoinMapper extends Mapper<LongWritable,Text,TableBean,NullWritable>{
//用来存储读取到的缓存数据
Map<String,String> pdMap = new HashMap<>();
// 读取缓存文件,转换成我们方便使用的数据结构备用
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new BOMInputStream(new FileInputStream("pd.txt"))));
String line;
while(StringUtils.isNotEmpty(line=bufferedReader.readLine())) {
String[] fields = line.split("\t");
pdMap.put(fields[0],fields[1]);
}
bufferedReader.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
TableBean tableBean = new TableBean();
String line = value.toString();
String[] fields = line.split("\t");
String order_id = fields[0];
String pid = fields[1];
int account = Integer.parseInt(fields[2]);
tableBean.setOrder_id(order_id);
tableBean.setPid(pid);
tableBean.setAccount(account);
tableBean.setPname(pdMap.get(pid)); // 直接从缓存中取出商品名称
tableBean.setFlag("");
context.write(tableBean,NullWritable.get());
}
}