MapReduce编程模型实现WordCount程序,在搭建的YARN上运行

##本程序在hadoop集群hdfs系统搭建完成,namenode,datanode可正常启动并使用的基础上进行。

  • 首先需要搭建YARN平台,修改hadoop下的配置文件yarn-site.xml即可。

<configuration>
<!-- 配置resourcemanager节点所在位置 -->
    <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>hdp-01</value>
    </property>
<!-- 配置nodemanager节点 -->
    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
    </property>
<!-- 配置内存大小,最小得1.5个G,否则会出错 -->
    <property>
    <name>yarn.nodemanager.resource.memory-mb</name>
    <value>2048</value>
    </property>
<!-- 配置cpu核数 -->
    <property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>2</value>
    </property>
</configuration>

 

然后将yarn-site.xml拷贝到其他集群下的同样目录下。scp yarn-site.xml hdp-02:$PWD

执行start-yarn.sh,在linux中查看resourcemanager和nodemanager是否启动成功。或者在网页上http://hdp-01:8088/进行查看,如下图即是成功,

MapReduce编程模型实现WordCount程序,在搭建的YARN上运行

#eclipse是按照可对hdfs文件系统操作的基础上进行程序编写配置。hdfs文件系统操作开发环境详情查阅:利用Java API对HDFS文件系统进行操作

  • 将缺少的yarn,mapreduce所需要的包以及依赖包(lib)导入到eclipse中,

MapReduce编程模型实现WordCount程序,在搭建的YARN上运行

WordCountMapper.java

package mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		//将接收的value转换成字符串
		String line = value.toString();
		String[] words = line.split(" ");
		//遍历字符串数组,每个单词设为text类型的key,value设置为1
		for (String word : words) {
			context.write(new Text(word), new IntWritable(1));
		}
	}
	
}

WordCountReducer.java

package mapreduce;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	protected void reduce(Text word, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
		
		int count=0;
		Iterator<IntWritable> iterator = values.iterator();
		while (iterator.hasNext()) {
			IntWritable value = iterator.next();
			count += value.get();
		}
		context.write(word, new IntWritable(count));
	}
}

JobSubmit.java   在windows系统中提交,需要导出jar包到指定位置。

package mapreduce;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 用于提交mapreduce客户端程序
 * 功能:封装本次job运行时所需参数,
 * 跟YARN进行交互,将maoreduce程序成功启动运行	
 */
public class JobSubmit {

	public static void main(String[] args) throws Exception {
		
		//在代码中设置jvm参数化,使其能够提交到linux中运行,  也可以在运行run config设置
		System.setProperty("HADOOP_USER_NAME", "root");
		
                //相关参数设置
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://hdp-01:9000");
		conf.set("mapreduce.framework.name", "yarn");
		conf.set("yarn.resourcemanager.hostname", "hdp-01");
		//从windows提交需要跨平台参数
		conf.set("mapreduce.app-submission.cross-platform", "true");
		Job job = Job.getInstance(conf);
		
		//封装参数,jar包所在的位置
		job.setJar("E:\\jar\\thy.jar");
		//job.setJarByClass(JobSubmit.class);
		
		
		//封参,job调用mapper和reducer类
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		//封参,mapper和 reducer实现类的返回类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"), conf, "root");
		Path output = new Path("/hdfsDemo/output");
		if(fs.exists(output)) {
			fs.delete(output,true);
		}
		
		//参数封装,本次要处理的数据集所在路径 ,输出结果路径
		FileInputFormat.setInputPaths(job,new Path("/hdfsDemo/input"));
		FileOutputFormat.setOutputPath(job,output);//注意输出路径必须不存在
		
		//封装参数,想要启动recuce task的数量
		job.setNumReduceTasks(2);
		
		//提交job给yarn
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:-1);
	}

}

JobSubmit2.java ,在Linux中运行。利用hadoop jar指令直接运行导出jar包的main类,也就是jobsubmit2.java。hadoop jar会自动的加载安装目录下所有的jar包,和配置文件到classpath中。

最后,在集群上上传本程序jar包到任意一台,执行 hadoop jar 你所上传的.jar 包.JobSubmit2执行。

我的是 hadoop jar thy.jar mapreduce.JobSubmit2

package mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JobSubmit2 {
	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = Job.getInstance();

		job.setJarByClass(JobSubmit2.class);

		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.setInputPaths(job, new Path("/hdfsDemo/input"));
		FileOutputFormat.setOutputPath(job, new Path("/hdfsDemo/output"));

		job.setNumReduceTasks(2);
		boolean res = job.waitForCompletion(true);
		System.exit(res ? 0 : -1);
	}
}

注意如果不利用maven进行jar包导出的话,一定不能按照整个程序导出jar包,否则会path相关报错,具体原因不详。

 

MapReduce编程模型实现WordCount程序,在搭建的YARN上运行MapReduce编程模型实现WordCount程序,在搭建的YARN上运行

在数据集比较小的时候本地运行速度非常快,在开发过程可选用部分数据集调试运行在本地,具体 代码如下:

package mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 在weindows本地运行maoreduce,可测试使用。
 * @author THY
 */
public class JobSubmitLocalRun {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
        //设置为本地
		conf.set("fs.defaultFS", "file:///");
		conf.set("mapreduce.framework.name", "local");
		Job job = Job.getInstance(conf);

		job.setJarByClass(JobSubmit2.class);

		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
        //输入输出路径设置为本地
		FileInputFormat.setInputPaths(job, new Path("E:\\WindowsLocalRunMR\\input"));
		FileOutputFormat.setOutputPath(job, new Path("E:\\WindowsLocalRunMR\\output"));

		job.setNumReduceTasks(2);
		boolean res = job.waitForCompletion(true);
		System.exit(res ? 0 : -1);
	}

}

成功运行结果如图示:

Linux运行结果:

MapReduce编程模型实现WordCount程序,在搭建的YARN上运行

eclipse运行结果:

MapReduce编程模型实现WordCount程序,在搭建的YARN上运行