MapReduce核心思想+详细流程学习(附案例代码)
-
MapReduce概述
-
MapReduce定义
-
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
-
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
-
-
优缺点
-
-
-
缺点: 慢
-
-
-
MapReduce的核心思想
-
map 映射
-
reduce 合并
-
MapReduce进程
-
写WordCount源码
-
常用的数据类型对应的Hadoop数据序列化类型
-
Java类型
|
Hadoop Writable类型
|
boolean
|
BooleanWritable
|
byte
|
ByteWritable
|
int
|
IntWritable
|
float
|
FloatWritable
|
long
|
LongWritable
|
double
|
DoubleWritable
|
String
|
Text
|
map
|
MapWritable
|
array
|
ArrayWritable
|
-
MR的Hello world
-
新建maven 项目 依赖同hdfs
-
导入log4j.properties
-
新建包 com.stw.wordcount
-
新建三个类 WCmapper WCreducer WCdriver
-
WordcountMapper.java
//LongWritable代表每行的偏移量,Text代表每行的数据
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
//Context是任务的始终
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
// 3 输出 遍历数组,把单词编程(word,1)的形式交给框架
for (String word : words) {
k.set(word);
context.write(k, v);//context.write(Text, IntWritable) keyout 和valueout
//context.write(new Text(word), new IntWritable(1)); 性能降低 回避大量new对象
}
}
}
WordcountReducer.java
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
int sum;
IntWritable v = new IntWritable();
@Override
//在reduce上已经完成了key的分类
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 输出
v.set(sum);
context.write(key,v);
}
}
WordcountDriver.java
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息以及Job实例 Job实际上就是Context
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置类路径(classpath)
job.setJarByClass(WordcountDriver.class);
// 3 设置Mapper和Reducer的calsspath
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
// 4 设置mapper输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出数据文件路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交我们的job
boolean result = job.waitForCompletion(true); //job是否完成
System.exit(result ? 0 : 1);
}
}
-
重写方法快捷键 ctrl+o
-
IDEA中设置参数
-
项目的打包
-
点击maven project
-
选择lifecycle 双击packege
-
出现target文件夹 里边有jar
-
copy jar到集群中
-
hadoop jar 1.jar com.atguigu.wordcount.WCDriver /1.txt /oooout
-
-
Hadoop序列化
-
序列化和反序列化
-
-
只序列化关键属性。
-
-
自定义值序列化接口
-
自定义bean 实现Writeable接口
-
必须实现write 和 readFields方法
-
read和write顺序一定要一致
-
-
-
案例二
-
package com.stw;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
long upflow;
long downflow;
long sumflow;
public FlowBean() {
}
@Override
public String toString() {
return upflow+"\t"+downflow+"\t"+sumflow;
}
public void set(long upflow, long downflow)
{
this.upflow=upflow;
this.downflow=downflow;
this.sumflow=upflow+downflow;
}
public long getUpflow() {
return upflow;
}
public void setUpflow(long upflow) {
this.upflow = upflow;
}
public long getDownflow() {
return downflow;
}
public void setDownflow(long downflow) {
this.downflow = downflow;
}
public long getSumflow() {
return sumflow;
}
public void setSumflow(long sumflow) {
this.sumflow = sumflow;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upflow);
dataOutput.writeLong(downflow);
dataOutput.writeLong(sumflow);
}
public void readFields(DataInput dataInput) throws IOException {
upflow = dataInput.readLong();
downflow = dataInput.readLong();
sumflow = dataInput.readLong();
}
}
package com.stw;
import com.stw.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable,Text,Text,FlowBean> {
private FlowBean flowBean=new FlowBean();
private Text phone=new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split("\t");
phone.set(splits[1]);
long upflow=Long.parseLong(splits[splits.length-3]);
long downflow=Long.parseLong(splits[splits.length-2]);
flowBean.set(upflow,downflow);
System.out.println(flowBean);
context.write(phone,flowBean);
}
}
package com.stw;
import com.stw.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
private FlowBean sumbean=new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long sumup=0;
long sumdown=0;
for(FlowBean flowBean:values)
{
System.out.println(flowBean);
sumup+=flowBean.getUpflow();
sumdown+=flowBean.getDownflow();
}
sumbean.set(sumup,sumdown);
context.write(key,sumbean);
}
}
package com.stw;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
-
MapReduce框架原理(**!!)
-
MR数据流
-
-
InputForamt:把输入的文件变成K,V值
-
切分数据
-
由文件变为切片的过程
-
生成record reader 把切片变为K-V值
-
TextInputFormat 切片方法---- kv方法
-
直接使用FIF的切片方法 每个文件 - 1.1倍
-
LineRecordReader 行的内容作为value 行的offset作为key
-
-
KeyValueInputFormat
-
直接使用FIF的切片方法 每个文件 - 1.1倍
-
KeyValueLineRecordReader 用指定的分隔符来划分Key Value
-
-
NLine
-
切片规则:自定义,N行一片
-
LineRecordReader 行的内容作为value 行的offset作为key
-
-
CombineTextInputFormat 小文件过多的场景
-
切片方法自定义
-
CombineFileRecordReader 由于跨文件
-
-
FixedLengthInputFormat(不常用)
-
FIF的切片方法
-
FixedLengthRecordReader
-
-
SequenceFileInputFormat
-
FIF的切片方法
-
SequenceFileRecordReader
-
两个mapreduce中间 两个任务的中间 前仆后继
-
-
-
-
-
Mapper
-
Shuffle
-
Reducer
-
OutputFormat
-
-
Job任务提交都做了什么事 --源码分析
-
确保JobState Defined
-
setUseNewAPI 兼容 老API
-
connect 连接集群和yarn
-
判断是本地还是Yarn的生成cluster
-
-
JobsubmitInternal
-
checkSpecs 检查输出路径
-
生成jobId 在临时文件夹中生成job
-
copyAndConfigureFiles
-
设置map的切片数量
-
写配置文件 到job.xml job.split jpb.splitmetainfo
-
真正提交任务job
-
-
-
切片怎么切
-
getSplits 逐个文件切
-
自定义SplitSize 可以提高minSize大小
-
切片规则:判断文件剩余部分按照1.1倍判断,按照1倍切,过小的切片会被分到上一个切片上
-
自定义InputFormat
-
需求
-
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
-
继承FileInputFormat 不用写切片方法
-
重写RecordReader 处理一个文件 把这个文件直接读成KV值
-
-
initialize
-
nextKeyValue
-
-
if(notRead)
-
{
-
具体读文件过程
-
notRead=false;
-
return true;
-
}
-
else
-
return false;
-
-
-
getCurrentKey
-
return key;
-
getCurrentValue
-
return value;
-
-
getProgress notRead=true
-
读了就是1
-
return notRead?0:1
-
-
close
-
-
由于需求不允许文件内部切片所以重写 isSpiltable方法 return false
-
如果Mapper和Reducer没有干事的话 就用默认的
-
-
自定义InputFormat代码
package com.stw.inputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class MyInputFormat extends FileInputFormat<Text,BytesWritable> {
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new MyRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
package com.stw.inputFormat;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class MyRecordReader extends RecordReader<Text,BytesWritable> {
private boolean NotRead=true;
private Text key=new Text();
private BytesWritable value=new BytesWritable();
private FileSplit fileSplit=null;
private FSDataInputStream inputStream=null;
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
fileSplit= (FileSplit) inputSplit;
Path path = fileSplit.getPath();
FileSystem fileSystem = path.getFileSystem(taskAttemptContext.getConfiguration());
inputStream = fileSystem.open(path);
}
public boolean nextKeyValue() throws IOException, InterruptedException {
if(NotRead)
{
key.set(fileSplit.getPath().toString());
byte[] buf=new byte[(int) fileSplit.getLength()];
int read = inputStream.read(buf);
value.set(buf,0,buf.length);
NotRead=false;
return true;
}
else {
return false;
}
}
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
public float getProgress() throws IOException, InterruptedException {
if(NotRead)
return 0;
else
return 1;
}
public void close() throws IOException {
IOUtils.closeStream(inputStream);
}
}
Driver
-
MapReduce详细工作流程
-
局部快排,整体归并 Shuffle
-
Partition 分区
-
MapTask的数量是切片数量
-
ReduceTask的数量是手设的job.setNumReduceTasks(10)
-
分区号对应去第几个ReduceTask (分区号的获取)
-
&Max_value 是防止负数
-
-
自定义分区(泛型是Mapper的输出)
package com.stw.Partation;
import com.stw.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPart extends Partitioner<Text,FlowBean> {
public int getPartition(Text text, FlowBean flowBean, int i) {
String phone_pre = text.toString().substring(0, 3);
switch (phone_pre) {
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
case "139":
return 3;
default:
return 4;
}
/* if (phone_pre.equals("136"))
return 0;
else if (phone_pre.equals("136"))
return 1;
else if (phone_pre.equals("136"))
return 2;
if (phone_pre.equals("136"))
return 3;
else
return 4;
}*/
}
}
-
-
driver中设置
-
如果分区号的个数大于ReduceTask的个数会报错
-
分区号个数可以小于ReduceTask,但是没有对应的ReduceTask没有输出
-
分区号不能跳,跳了会有空的ReduceTask
-
-
-
WritableComparable排序
-
要排序就要把排序的东西作为Key 并实现WriteComparable接口
package com.stw.MyFlow;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class mFlomBean implements WritableComparable<mFlomBean> {
long upflow;
long downflow;
long sumflow;
public mFlomBean() {
}
@Override
public String toString() {
return upflow+"\t"+downflow+"\t"+sumflow;
}
public void set(long upflow, long downflow)
{
this.upflow=upflow;
this.downflow=downflow;
this.sumflow=upflow+downflow;
}
public long getUpflow() {
return upflow;
}
public void setUpflow(long upflow) {
this.upflow = upflow;
}
public long getDownflow() {
return downflow;
}
public void setDownflow(long downflow) {
this.downflow = downflow;
}
public long getSumflow() {
return sumflow;
}
public void setSumflow(long sumflow) {
this.sumflow = sumflow;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upflow);
dataOutput.writeLong(downflow);
dataOutput.writeLong(sumflow);
}
public void readFields(DataInput dataInput) throws IOException {
upflow = dataInput.readLong();
downflow = dataInput.readLong();
sumflow = dataInput.readLong();
}
@Override
public int compareTo(mFlomBean o) {
return Long.compare(o.sumflow,this.sumflow);//降序
}
}
-
排序+自定义分区
-
按照value分区
-
-
Combiner合并 --目的:减小IO
-
Combiner用于Mapper的MapTask的局部的汇总
-
默认不启动
-
-
Combiner输入与输出的类型 必须一样(输入类型必须有序)
-
两次归并,一次快排
-
作用在两次归并上,目的是去除重复的key值
-
-
自定义Combiner
-
job.setCombinerClass(); 减小map阶段的局部输出(和Reducer类似)
-
-
GroupingComparator分组(辅助排序) 注意MapReduce 先排序后分组
-
默认的分组是按照key的默认Compare方法来分组,key值相等为一组
-
分组案例
-
再排序时按照订单号先排序再按价钱降序(升序this在前,降序other在前)
-
OrderBean
-
序列化write、readFields
-
比较方法: 先比较orderID再比较price (二次排序)
-
orderDriver
-
orderMapper NullWritable.get()
-
orderReducer
-
orderComparator---GroupingComparator[Hadoop中的数据传递都是序列化的(可存磁盘,紧凑),不是对象的传递,所以在获取对象时,要先进行反序列化]
-
重写注意compare 选择 a:WritableComparator b:WritableComparator
-
反序列化
-
-
注意:OrderBean对象中的第一个订单对象是价钱最高的对象,即排序最靠前的
-
为什么全出来了??key是一个对象,value是单例的空值啊?
-
实际上,values对象只是一个普通的对象, 在reducer中实际上是一个反序列化的过程,把序列化中存储的k,v值依次赋给key,value
-
故对象是一个,但是值有所有的值
-
反序列化过程 对排好序的k,v 进行分组 同一组返回true 不同返回false
-
-
-
取出每个订单的前两名
-
-
-
-
Shuffle机制
-
先分区再排序的本质就是二次排序
-
环形缓冲区
-
逻辑上环形,左边存索引,右边存k,v
-
-
在环形缓冲区在百分之80时,开始溢写
-
在环形缓冲区中排序,交换索引
-
-
-
-
背过!
-
合并:去重复key,归并:两个有序文件的合并
-
-
分组的目的是让不同组的数据k,v值,分批次进入不同的reduce方法
-
-
OutputFormat数据输出 K-V值到文件
-
自定义OutputFomat
-
MyOutputFormat 集成 FileOutputFormat (泛型是reduce的输出)
-
重写RecordWriter getRecordWriter方法
-
-
自定义MyRecordWriter (来一组K-v值写到文件)
-
注意recordwriter拿一行的数据需要加换行
-
Driver
-
问题: 输出文件夹没有使用上,改变开流方式
public class MyRecordWriter extends RecordWriter<LongWritable,Text> {
private FSDataOutputStream fs1;
private FSDataOutputStream fs2;
public void init(TaskAttemptContext job) throws IOException {
String outdir=job.getConfiguration().get(FileOutputFormat.OUTDIR);
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
fs1=fileSystem.create(new Path(outdir+"/atguigu.log"));
fs2=fileSystem.create(new Path(outdir+"/other.log"));
}
@Override
public void write(LongWritable longWritable, Text text) throws IOException, InterruptedException {
String line =text.toString();
if(line.contains("atguigu"))
{
fs1.write((line+"\n").getBytes());
}
else
{
fs2.write((line+"\n").getBytes());
}
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
IOUtils.closeStream(fs1);
IOUtils.closeStream(fs2);
}
}
----------------------------------------------------------------------------------------
public class MyOut extends FileOutputFormat<LongWritable,Text> {
@Override
public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
MyRecordWriter myRecordWriter=new MyRecordWriter();
myRecordWriter.init(taskAttemptContext);
return myRecordWriter;
}
}
----------------------------------------------------------------------------------------
public class outputDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job=Job.getInstance(new Configuration());
job.setJarByClass(outputDriver.class);
job.setOutputFormatClass(MyOut.class);
FileInputFormat.setInputPaths(job,new Path("d:/test2/input/log.txt"));
FileOutputFormat.setOutputPath(job,new Path("d:/test2/output"));
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
-
Reduce Join(Hive本质是用sql的方式写MapReduce)
-
核心:reduce方法中的内容
-
-
select (o.id,p.pname,o.amount)
-
from order o
-
left join pd p
-
on o.pid=p.pid
-
相同的pid进入同一组
-
OrderBean pid 和pname 二次排序
-
-
Mapper 的setup方法 :在开始时执行一次
-
mapper获取文件切片对应的文件名
-
-
-
cleanup方法:在结束时执行一次
-
-
-
-
Map Join
-
使用场景 :N张表十分小,一张表很大
-
优点:使用它就不需要reduce shuffle job.setNumReduceTasks(0)
-
解决数据倾斜
-
-
-
小表缓存
-
-
Mapper缓存文件在setup中读入
-
计数器
-
数据清洗 (计数器案例) getCounter
-
简易版
-
-
-
计数器复杂版
-
MapReduce总结