eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)
说明:之前写过一篇文档https://blog.csdn.net/Allenzyg/article/details/106408236《CDH6.3.1离线部署安装(3个节点)》,这里为什么是CDH6.2.1呢,因为之前安装的CDH6.3.1内存分配的太少(本身电脑内存就很小,所以分配的少,不然电脑内存会被吃完,电脑卡死机),启动之后会很卡,执行太耗时,而这个CDH6.2.1集群是我们公司环境,集群环境还是相当给力的,我使用v*n远程连接,以后的测试和项目就使用公司的CDH集群了。
一、使用eclipse连接CDH6.2.1上的Hadoop
1.在自己电脑(windows系统)上配置jdk、Hadoop的环境变量
2.配置完环境变量后,验证一下:
Win+R,输入cmd
进入到命令窗口:java -version
进入Hadoop的bin目录下:hadoop version
3. 下载eclipse,安装eclipse
4. 把hadoop-eclipse-plugin-2.6.0.jar复制到eclipse的plugins目录下
5.把E:\Hadoop\hadoop-2.7.3\bin目录下的hadoop.dll复制到C:\Windows\System32目录下。原因:如果不添加,会在运行程序时报错Exceptionin thread "main" java.lang.UnsatisfiedLinkError:org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(II[BI[BIILjava/lang/String;JZ)V
6.修改C:\Windows\System32\drivers\etc\hosts文件,添加远程集群ip和hostname
7.打开eclipse,找到Window--->Preferences--->Hadoop Map/Reduce,填写自己配置环境变量的Hadoop目录
8.点击右下角的小蓝象图标
修改之前
修改之后
点击右下角“finish”,完成。
这里注意:Apache hadoop采用的端口是9000和9001,而Cloudera Manager采用的端口是8020端口
9.完成之后,左侧会出现DFS Locations,这个跟你的hdfs下的目录完全一致
如果左侧没有出现这个DFS Locations,找到window--->show view--->other…下的MapReduce Tools,点击“open”即可。
二、下面开始编写wordcount代码,新建一个project
数据准备:
在本地新建wc.txt,内容如下:
hello nihao
allen I love you
zyg I love you
hello hadoop
bye hadoop
bye allen
bye zyg
上传到hdfs:
代码如下:
package Hadoop; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { /* * 先经过mapper运算,然后才是reducer。 * 内部类:映射器 Mapper<Key_IN, Value_IN, Key_OUT, Value_OUT> * 首先读取源文本 */ public static class WcMap extends Mapper<Object,Text,Text,IntWritable>{ //占位体,1,查到一个就占个坑 private final static IntWritable one=new IntWritable(1); //文本 private Text word=new Text(); //每次调用map方法会传入split中一行数据。 //key:该行数据所在文件中的位置下标,value:该行内容(数据),context:上下文对象,在整个wordcount运算周期内存活。 //这里K、V像这样[K,V] //重写map方法,实现理想效果。WcMap的实例只有一个,但实例的这个map方法却一直在执行,直到读取结束 @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //拆分字符串,返回单词集合。默认以空格和换行/回车拆分 StringTokenizer itr=new StringTokenizer(value.toString()); /*补充: StringTokenizer是一个用来分隔String的应用类,相当于VB(Visual Basic是一种由微软公司开发的结构化的、模块化的、面向对象的、包 含协助开发环境的事件驱动为机制的可视化程序设计语言)的split函数。 StringTokenizer是字符串分隔解析类型,属于:Java.util包。 1.StringTokenizer的构造函数 StringTokenizer(String str):构造一个用来解析str的StringTokenizer对象。 java默认的分隔符是“空格”、“制表符(‘\t’)”、“换行符(‘\n’)”、“回车符(‘\r’)”。 StringTokenizer(String str,String delim):构造一个用来解析str的StringTokenizer对象, 并提供一个指定的分隔符。 StringTokenizer(String str,String delim,boolean returnDelims):构造一个用来解析str的StringTokenizer对象,并提供一个指定的分 隔符,同时,指定是否返回分隔符。 */ //遍历一行的全部单词 while(itr.hasMoreTokens()){ //将文本转为临时Text变量 String curword=itr.nextToken(); word.set(curword); //将单词保存到上下文对象(单词,占位体),输出 context.write(word, one); }
}
} /************************************************************************ * 在Mapper后,Reducer前,有个shuffle过程,会根据k2将对应的v2归并为v2[...] * *************************************************************************/ /** * mapper结束后,执行现在的reducer。 * 内部类:拆分器 Reducer<Key_IN, Value_IN, Key_OUT, Value_OUT> */
public static class WcReduce extends Reducer<Text,IntWritable,Text,IntWritable>{ //计数器。个数统计 private IntWritable times=new IntWritable(); /** * 重写reduce方法,实现理想效果 * WcReduce的实例也只有一个,但实例的这个reduce方法却一直在执行,直到完成统计 * Key:单词。Values:value的集合,也就是[1,1,1,...]。context:上下文对象 * 这里这里K、V像这样[K,V[1,1,1,...]]。每执行一次,key就是一个新单词,对应的values就是其全部占位体 **/ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum=0; //累加value的元素,有多少个占为体1,即有多少个指定单词 for(IntWritable i:values){ sum +=i.get();//对单词为key的计数统计。i是IntWritable类型不能直接加,所以i.get()就是把IntWritable类型变成整数int类型 } times.set(sum);//每次set一下都会清空之前的值 //终于将单词和总个数再次输出 context.write(key, times);//输出到 hdfs:/output中到结果文件 } }
public static void main(String[] args) throws Exception { //HDFS配置 Configuration conf=new Configuration(); //作业(环境) Job job =Job.getInstance(conf); job.setJarByClass(WordCount.class);//执行作业的类 job.setMapperClass(WcMap.class);//读取元数据,执行map运算的类 /* Combiner * 通常,每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。 * combiner的输入输出类型必须和mapper的输出以及reducer的输入类型一致 */ //job.setCombinerClass(WcReduce.class); // 统计数据,执行reducer的类 job.setReducerClass(WcReduce.class); //统计数据,执行reducer的类 job.setOutputKeyClass(Text.class); //设置好输出的key的类型,和context上下文对象write的参数类型一致。 //job.setNumReduceTasks(1); //设置reduce任务的个数 job.setOutputValueClass(IntWritable.class); // 设置输出的value类型 FileInputFormat.addInputPath(job, new Path("hdfs://manager:8020/test/input/wc.txt"));// 元数据路径,(输入的文件或者目录)必须已存在 FileOutputFormat.setOutputPath(job, new Path("hdfs://manager:8020/test/output/wc"));// 统计结果输出路径(输出的文件或者目录),程序自动创建 System.exit(job.waitForCompletion(true)?0:1);// 等待提交作业到集群并完成,才结束程序。等待job完成,若系统运行成功, 则返回0 ,否则返回1 } } |
执行结果如下: