MapReduce编程模型实现WordCount程序,在搭建的YARN上运行
##本程序在hadoop集群hdfs系统搭建完成,namenode,datanode可正常启动并使用的基础上进行。
-
首先需要搭建YARN平台,修改hadoop下的配置文件yarn-site.xml即可。
<configuration>
<!-- 配置resourcemanager节点所在位置 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hdp-01</value>
</property>
<!-- 配置nodemanager节点 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 配置内存大小,最小得1.5个G,否则会出错 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>
<!-- 配置cpu核数 -->
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property>
</configuration>
然后将yarn-site.xml拷贝到其他集群下的同样目录下。scp yarn-site.xml hdp-02:$PWD
执行start-yarn.sh,在linux中查看resourcemanager和nodemanager是否启动成功。或者在网页上http://hdp-01:8088/进行查看,如下图即是成功,
#eclipse是按照可对hdfs文件系统操作的基础上进行程序编写配置。hdfs文件系统操作开发环境详情查阅:利用Java API对HDFS文件系统进行操作
-
将缺少的yarn,mapreduce所需要的包以及依赖包(lib)导入到eclipse中,
WordCountMapper.java
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//将接收的value转换成字符串
String line = value.toString();
String[] words = line.split(" ");
//遍历字符串数组,每个单词设为text类型的key,value设置为1
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
WordCountReducer.java
package mapreduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text word, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int count=0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
IntWritable value = iterator.next();
count += value.get();
}
context.write(word, new IntWritable(count));
}
}
JobSubmit.java 在windows系统中提交,需要导出jar包到指定位置。
package mapreduce;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 用于提交mapreduce客户端程序
* 功能:封装本次job运行时所需参数,
* 跟YARN进行交互,将maoreduce程序成功启动运行
*/
public class JobSubmit {
public static void main(String[] args) throws Exception {
//在代码中设置jvm参数化,使其能够提交到linux中运行, 也可以在运行run config设置
System.setProperty("HADOOP_USER_NAME", "root");
//相关参数设置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hdp-01:9000");
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hdp-01");
//从windows提交需要跨平台参数
conf.set("mapreduce.app-submission.cross-platform", "true");
Job job = Job.getInstance(conf);
//封装参数,jar包所在的位置
job.setJar("E:\\jar\\thy.jar");
//job.setJarByClass(JobSubmit.class);
//封参,job调用mapper和reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//封参,mapper和 reducer实现类的返回类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"), conf, "root");
Path output = new Path("/hdfsDemo/output");
if(fs.exists(output)) {
fs.delete(output,true);
}
//参数封装,本次要处理的数据集所在路径 ,输出结果路径
FileInputFormat.setInputPaths(job,new Path("/hdfsDemo/input"));
FileOutputFormat.setOutputPath(job,output);//注意输出路径必须不存在
//封装参数,想要启动recuce task的数量
job.setNumReduceTasks(2);
//提交job给yarn
boolean res = job.waitForCompletion(true);
System.exit(res?0:-1);
}
}
JobSubmit2.java ,在Linux中运行。利用hadoop jar指令直接运行导出jar包的main类,也就是jobsubmit2.java。hadoop jar会自动的加载安装目录下所有的jar包,和配置文件到classpath中。
最后,在集群上上传本程序jar包到任意一台,执行 hadoop jar 你所上传的.jar 包.JobSubmit2执行。
我的是 hadoop jar thy.jar mapreduce.JobSubmit2
package mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobSubmit2 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance();
job.setJarByClass(JobSubmit2.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("/hdfsDemo/input"));
FileOutputFormat.setOutputPath(job, new Path("/hdfsDemo/output"));
job.setNumReduceTasks(2);
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : -1);
}
}
注意如果不利用maven进行jar包导出的话,一定不能按照整个程序导出jar包,否则会path相关报错,具体原因不详。
在数据集比较小的时候本地运行速度非常快,在开发过程可选用部分数据集调试运行在本地,具体 代码如下:
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 在weindows本地运行maoreduce,可测试使用。
* @author THY
*/
public class JobSubmitLocalRun {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//设置为本地
conf.set("fs.defaultFS", "file:///");
conf.set("mapreduce.framework.name", "local");
Job job = Job.getInstance(conf);
job.setJarByClass(JobSubmit2.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入输出路径设置为本地
FileInputFormat.setInputPaths(job, new Path("E:\\WindowsLocalRunMR\\input"));
FileOutputFormat.setOutputPath(job, new Path("E:\\WindowsLocalRunMR\\output"));
job.setNumReduceTasks(2);
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : -1);
}
}
成功运行结果如图示:
Linux运行结果:
eclipse运行结果: