MapReduce概述和编写简单案例-01
MapReduce概念
Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;
Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。
为什么要MapReduce
1)海量数据在单机上处理因为硬件资源限制,无法胜任
2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
3)引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。
4)mapreduce分布式方案考虑的问题
(1)运算逻辑要不要先分后合?
(2)程序如何分配运算任务(切片)?
(3)两阶段的程序如何启动?如何协调?
(4)整个程序运行过程中的监控?容错?重试?
分布式方案需要考虑很多问题,但是我们可以将分布式程序中的公共功能封装成框架,让开发人员将精力集中于业务逻辑上。而mapreduce就是这样一个分布式程序的通用框架。
MapReduce核心思想
上图简单的阐明了map和reduce的两个过程或者作用,虽然不够严谨,但是足以提供一个大概的认知,map过程是一个蔬菜到制成食物前的准备工作,reduce将准备好的材料合并进而制作出食物的过程
MapReduce进程
一个完整的mapreduce程序在分布式运行时有三类实例进程:
1)MrAppMaster:负责整个程序的过程调度及状态协调
2)MapTask:负责map阶段的整个数据处理流程
3)ReduceTask:负责reduce阶段的整个数据处理流程
MapReduce编程规范(八股文:也就是按这种格式来写,都一样的套路写)
用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
1)Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper中的业务逻辑写在map()方法中
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(5)map()方法(maptask进程)对每一个<K,V>调用一次
2)Reducer阶段
(1)用户自定义的Reducer要继承自己的父类
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法中
(4)Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
3)Driver阶段
整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象
常用数据序列化类型
常用的数据类型对应的hadoop数据序列化类型
Java类型 |
Hadoop Writable类型 |
boolean |
BooleanWritable |
byte |
ByteWritable |
int |
IntWritable |
float |
FloatWritable |
long |
LongWritable |
double |
DoubleWritable |
string |
Text |
map |
MapWritable |
array |
ArrayWritable |
案例实操wordcount案例,统计单词出现次数
编码先导入hadoop依赖包,下面这个路径里有详细操作步骤
https://blog.****.net/kxj19980524/article/details/89043569
mapper阶段
Mapper<LongWritable, Text, Text, IntWritable>
mapper后面的这四个参数,就是输入key类型输入value类型,输出key类型,输出value类型
看上图,读取hello.txt数据的时候它是一行一行读的,key就是行号,value就是这一行的字符串,对照上面hadoop与java对应的数据类型,long对应的是LongWritable,String对应的text所以第一个参数是LongWritable,第二个参数为Text,看下面代码,mapper里做的工作是把每行的数据按空格进行分开,然后输出到缓冲区,没分出一个单词就输出一次,输出的key就是单词名,value就是1.有几个重复的单词就在缓冲区value的数组里加个1,可以把value看成是一个数组.
package com.buba.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 接口参数:
* 第一个参数 KEYIN:输入数据的key 文件的行号
* 第二个参数 VALUEIN:每行的输入数据
*
* 第三个参数 KEYOUT:输出数据的key
* 第四个参数 VALUEOUT:输出数据的value类型
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
//hello world
//hadoop spark
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.获取这一行数据
String line = value.toString();
//2.获取每一个单词
String[] words = line.split(" ");
for(String word : words){
System.out.println(word);
//3.输出每一个单词
context.write(new Text(word),new IntWritable(1));
}
}
}
在源码中可以看到,每次写出一条数据的时候都会在这判断是否有下一行,然后再进map方法
reduce阶段
reducer后面的四个参数跟上面同理,整个mapper阶段执行完后才执行reducer阶段.这里面的逻辑就是把mapper阶段拆开的单词数量进行累加然后输出出去.
package com.buba.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordcountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//1.统计所有单词个数
int count = 0;
for(IntWritable value:values){
System.out.println(key);
System.out.println(value);
count += value.get();
}
//2.输出所有单词个数
context.write(key,new IntWritable(count));
}
}
driver阶段
就是设置一些job的参数,这个mapreduce的各个参数.
package com.buba.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//驱动主程序
public class WordcountDriver {
public static void main(String[] args) throws Exception{
//1.获取job对象信息 Configuration这个对面默认从本地获取,直接new对象就行不用添加参数
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//2.设置加载jar位置
job.setJarByClass(WordcountDriver.class);
//3.设置mapper和reducer的class类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//4.设置输出mapper的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置最终数据输出的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.设置输入数据和输出数据路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//7.提交
boolean result = job.waitForCompletion(true);
//如果成功返回0,失败返回1
System.exit(result?0:1);
}
}
进行测试
测试1,在window测试,需要安装hadoop环境变量.把main方法添加两个参数,因为driver代码里调用了args数组里的两个值.这两个参数第一个是输入参数路径,就是源数据文件,第二个参数是mapreduce执行完后生成的文件路径地址,注意,这个路径不能存在,是执行完后自动生成的,如果这个文件本来就存在会报错.
放入一个log4j,要不然出不来日志
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n