eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

说明:之前写过一篇文档https://blog.csdn.net/Allenzyg/article/details/106408236《CDH6.3.1离线部署安装(3个节点)》,这里为什么是CDH6.2.1呢,因为之前安装的CDH6.3.1内存分配的太少(本身电脑内存就很小,所以分配的少,不然电脑内存会被吃完,电脑卡死机),启动之后会很卡,执行太耗时,而这个CDH6.2.1集群是我们公司环境,集群环境还是相当给力的,我使用v*n远程连接,以后的测试和项目就使用公司的CDH集群了。

一、使用eclipse连接CDH6.2.1上的Hadoop

1.在自己电脑(windows系统)上配置jdk、Hadoop的环境变量

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

2.配置完环境变量后,验证一下:

Win+R,输入cmd

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

进入到命令窗口:java -version

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

进入Hadoop的bin目录下:hadoop version

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

3. 下载eclipse,安装eclipse
4. 把hadoop-eclipse-plugin-2.6.0.jar复制到eclipse的plugins目录下

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

5.把E:\Hadoop\hadoop-2.7.3\bin目录下的hadoop.dll复制到C:\Windows\System32目录下。原因:如果不添加,会在运行程序时报错Exceptionin thread "main" java.lang.UnsatisfiedLinkError:org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(II[BI[BIILjava/lang/String;JZ)V
6.修改C:\Windows\System32\drivers\etc\hosts文件,添加远程集群ip和hostname
eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

7.打开eclipse,找到Window--->Preferences--->Hadoop Map/Reduce,填写自己配置环境变量的Hadoop目录

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

8.点击右下角的小蓝象图标

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

修改之前

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

修改之后

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

点击右下角“finish”,完成。
这里注意:Apache hadoop采用的端口是9000和9001,而Cloudera Manager采用的端口是8020端口
9.完成之后,左侧会出现DFS Locations,这个跟你的hdfs下的目录完全一致
eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

如果左侧没有出现这个DFS Locations,找到window--->show view--->other…下的MapReduce Tools,点击“open”即可。

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

二、下面开始编写wordcount代码,新建一个project

数据准备:

在本地新建wc.txt,内容如下:

hello nihao

allen I love you

zyg I love you

hello hadoop

bye hadoop

bye allen

bye zyg

上传到hdfs:

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

代码如下:

package Hadoop;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

       /*   

        * 先经过mapper运算,然后才是reducer。    

        * 内部类:映射器 Mapper<Key_IN, Value_IN, Key_OUT, Value_OUT>    

        * 首先读取源文本    

        */

    public static class WcMap extends Mapper<Object,Text,Text,IntWritable>{

         //占位体,1,查到一个就占个坑

         private final static IntWritable one=new IntWritable(1);

         //文本

         private Text word=new Text();

         //每次调用map方法会传入split中一行数据。

         //key:该行数据所在文件中的位置下标,value:该行内容(数据),context:上下文对象,在整个wordcount运算周期内存活。

         //这里K、V像这样[K,V]

         //重写map方法,实现理想效果。WcMap的实例只有一个,但实例的这个map方法却一直在执行,直到读取结束

         @Override

         protected void map(Object key, Text value,

                  Mapper<Object, Text, Text, IntWritable>.Context context)

                  throws IOException, InterruptedException {

         //拆分字符串,返回单词集合。默认以空格和换行/回车拆分

             StringTokenizer itr=new StringTokenizer(value.toString());

         /*补充:

           StringTokenizer是一个用来分隔String的应用类,相当于VB(Visual Basic是一种由微软公司开发的结构化的、模块化的、面向对象的、包            含协助开发环境的事件驱动为机制的可视化程序设计语言)的split函数。

           StringTokenizer是字符串分隔解析类型,属于:Java.util包。

           1.StringTokenizer的构造函数

           StringTokenizer(String str):构造一个用来解析str的StringTokenizer对象。

           java默认的分隔符是“空格”、“制表符(‘\t’)”、“换行符(‘\n’)”、“回车符(‘\r’)”。

           StringTokenizer(String str,String delim):构造一个用来解析str的StringTokenizer对象, 并提供一个指定的分隔符。

           StringTokenizer(String str,String delim,boolean returnDelims):构造一个用来解析str的StringTokenizer对象,并提供一个指定的分             隔符,同时,指定是否返回分隔符。

          */

         //遍历一行的全部单词

             while(itr.hasMoreTokens()){

         //将文本转为临时Text变量

                  String curword=itr.nextToken();

                  word.set(curword);

         //将单词保存到上下文对象(单词,占位体),输出

                  context.write(word, one);

             }

 

         }       

 

    }

   /************************************************************************

    *  在Mapper后,Reducer前,有个shuffle过程,会根据k2将对应的v2归并为v2[...]  *

    *************************************************************************/

   /**

    * mapper结束后,执行现在的reducer。

    * 内部类:拆分器 Reducer<Key_IN, Value_IN, Key_OUT, Value_OUT>

    */

 

public static class WcReduce extends Reducer<Text,IntWritable,Text,IntWritable>{

//计数器。个数统计

    private IntWritable times=new IntWritable();

      /**

        * 重写reduce方法,实现理想效果

        * WcReduce的实例也只有一个,但实例的这个reduce方法却一直在执行,直到完成统计

        * Key:单词。Values:value的集合,也就是[1,1,1,...]。context:上下文对象

        * 这里这里K、V像这样[K,V[1,1,1,...]]。每执行一次,key就是一个新单词,对应的values就是其全部占位体

       **/

    @Override

    protected void reduce(Text key, Iterable<IntWritable> values,

             Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

 

         int sum=0;

        //累加value的元素,有多少个占为体1,即有多少个指定单词

         for(IntWritable i:values){

             sum +=i.get();//对单词为key的计数统计。i是IntWritable类型不能直接加,所以i.get()就是把IntWritable类型变成整数int类型

         }

         times.set(sum);//每次set一下都会清空之前的值

        //终于将单词和总个数再次输出

         context.write(key, times);//输出到 hdfs:/output中到结果文件

    }

}

 

    public static void main(String[] args) throws Exception {

        //HDFS配置

         Configuration conf=new Configuration();

        //作业(环境)

         Job job =Job.getInstance(conf);

         job.setJarByClass(WordCount.class);//执行作业的类

         job.setMapperClass(WcMap.class);//读取元数据,执行map运算的类

         /* Combiner

     * 通常,每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

     * combiner的输入输出类型必须和mapper的输出以及reducer的输入类型一致

          */

         //job.setCombinerClass(WcReduce.class);    // 统计数据,执行reducer的类

         job.setReducerClass(WcReduce.class);                  //统计数据,执行reducer的类

         job.setOutputKeyClass(Text.class);                         //设置好输出的key的类型,和context上下文对象write的参数类型一致。

         //job.setNumReduceTasks(1);                                 //设置reduce任务的个数

         job.setOutputValueClass(IntWritable.class);            // 设置输出的value类型

         FileInputFormat.addInputPath(job, new Path("hdfs://manager:8020/test/input/wc.txt"));// 元数据路径,(输入的文件或者目录)必须已存在

         FileOutputFormat.setOutputPath(job, new Path("hdfs://manager:8020/test/output/wc"));// 统计结果输出路径(输出的文件或者目录),程序自动创建

         System.exit(job.waitForCompletion(true)?0:1);// 等待提交作业到集群并完成,才结束程序。等待job完成,若系统运行成功, 则返回0 ,否则返回1

    }

}

执行结果如下:

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)

eclipse连接CDH6.2.1集群的Hadoop集群(wordcount简单测试)