Hadoop Eclipse开发环境的建立

1. 安装Hadoop集群成功,下载解压版eclipse

2. 下载hadoop-eclipse-plugin-2.6.0.jar

放在eclipse安装文件夹的插件(plugins)中  

下载链接:https://github.com/winghc/hadoop2x-eclipse-plugin/blob/master/release/hadoop-eclipse-plugin-2.6.0.jar

Hadoop Eclipse开发环境的建立

3. 重启eclipse,出现如图:多出现了Hadoop Map/Reduce

window ->preference -> hadoop Map/Reduce -> Hadoop installation directory

Hadoop Eclipse开发环境的建立

4. 填写本地hadoop解压的路径,这个hadoop版本是linux的,只需要解压原安装版本tar.gz包 然后拷贝到本地就可以了

5. 下载hadoop-common-2.2.0-bin,将bin下的winutils.exe复制hadoop/bin目录下

Hadoop Eclipse开发环境的建立

Hadoop Eclipse开发环境的建立

6. 配置Map/ReduceLocations

windows → show view → other->Map/Reduce Locations-> 点击“OK”

在“Map/Reduce Locations” Tab页 点击图标<大象+>或者在空白的地方右键,选择“New Hadoop location…”,弹出对话框“New hadoop location…”,进行相应配置

注意:MR Master和DFS Master配置必须和mapred-site.xml和core-site.xml等配置文件一致

Hadoop Eclipse开发环境的建立

7. 打开Project Explorer,查看HDFS文件系统。

Hadoop Eclipse开发环境的建立

8. 配置环境变量

8.1 添加HADOOP_HOME

Hadoop Eclipse开发环境的建立

8.2 修改JAVA_HOME

原来是C:\Program Files\Java\jdk1.8.0_144,因为路径中有空格,所以必须修改为C:\Progra~1\Java\jdk1.8.0_144

Hadoop Eclipse开发环境的建立

8.3 修改Path

8.4 重启windows 验证

win+R   然后输入hadoop version出现如下图则验证成功

Hadoop Eclipse开发环境的建立

 

9.使用Java API操作HDFS

9.1:使用HadoopURL

package com.kawa.hdfs;

import java.io.InputStream;

import java.net.URL;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;

import org.apache.hadoop.io.IOUtils;

/**

 * 使用Hadoop URL操作HDFS

 * @author Administrator

 */

public class HadoopURICatFile {

     //java虚拟机能够识别HDFS URL

     static {

         URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());

     }

     public static void main(String[] args) {

         InputStream in = null;

         try {

              in = new URL("hdfs://192.168.74.137:9000/input2/file1.txt").openStream();

              IOUtils.copyBytes(in, System.out, 4096,false);

         } catch (Exception e) {

              // TODO: handle exception

         }finally {

              IOUtils.closeStream(in);

         }

     }

}

9.2:使用Hadoop FileSystem API

package com.kawa.hdfs;

import java.io.IOException;

import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

/**

 * 使用Hadoop FileSystem API进行HDFS的操作

 * @author Administrator

 *   注意:我们在window上面开发程序操作Linux上面的HDFS,默认是以windows当前用户身份进行,因此可能是administrator也可能是其他用户,

 *        而这些用户在hadoop上面一般只有读取权限。如何进行增删改操作呢?

 *        方式一:在windows下创建一个hadoop用户  之后在这个用户下进行玩耍HDFS操作

 *        方式二:对HDFS上面的文件开放权限(eg:hadoop fs -chmod 777 /input2/*)

 */

public class HadoopAPICatFile {

     public static void main(String[] args) throws IOException {

         //1.获取configuration对象

         Configuration conf = new Configuration();

         //第一种方式手动设置默认文件系统 /手动设置Configuration参数

         //conf.set("fs.defaultFS", "hdfs://192.168.74.137:9000");

         //第二种方式 core-site.xml直接放到src下面 会进行自动加载 无需再进行设置configuration对象了,如果没有则自动加载core-site-default.xml

         //2.获取FileSystem对象

         FileSystem fs = FileSystem.get(conf);

         //3.使用FileSystem对象操作文件

         boolean flag = fs.rename(new Path("/input2/file1.txt"), new Path("/input2/a.txt"));

         if(flag)

              System.out.println("修改成功!");

         else

              System.out.println("修改失败!");

         /*InputStream in = fs.open(new Path("/input2/file2.txt"));

         IOUtils.copyBytes(in, System.out, 4096);

         IOUtils.closeStream(in);*/

     }

}

10. 新建Map/Reduce任务

需要先启动Hadoop服务(start-dfs.sh/start-yarn.sh)

File->New->project->Map/Reduce Project->Next

编写WordCount类:

package com.kawa.mp;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class WordCount {

     //创建Mapper

     public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

         private final static IntWritable one = new IntWritable(1);

         private Text word = new Text();

         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

              // Object key, Text value就是输入的keyvalue, Context记录输入的keyvalue

              StringTokenizer itr = new StringTokenizer(value.toString());

              while (itr.hasMoreTokens()) {

                   word.set(itr.nextToken());

                   context.write(word, one);

              }

         }

     }

     //创建Reducer

     public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

         private IntWritable result = new IntWritable();

 

         public void reduce(Text key, Iterable<IntWritable> values, Context context)

                   throws IOException, InterruptedException {

              //reduce函数与map函数基本相同,但value是一个迭代器的形式Iterable<IntWritable> values,也就是说reduce的输入是一个key对应一组的值的value

              int sum = 0;

              for (IntWritable val : values) {

                   sum += val.get();

              }

              result.set(sum);

              context.write(key, result); // 结果例如World, 2

         }

     }

     //配置job

     public static void main(String[] args) throws Exception {

         Configuration conf = new Configuration();

         Job job = Job.getInstance(conf, "word count");// 指定job名称,及运行对象

         job.setJarByClass(WordCount.class);

         job.setMapperClass(TokenizerMapper.class); // 指定map函数

         job.setCombinerClass(IntSumReducer.class); // combiner整合

         job.setReducerClass(IntSumReducer.class);// 设定reduce函数

         job.setOutputKeyClass(Text.class);// 设定输出key数据类型

         job.setOutputValueClass(IntWritable.class);// 设定输出value数据类型

         FileInputFormat.addInputPath(job, new Path(args[0]));// 设定输入目录

         FileOutputFormat.setOutputPath(job, new Path(args[1]));

         System.exit(job.waitForCompletion(true) ? 0 : 1);

     }

}

测试:导出jar包

执行命令:

./bin/hadoop jar ~/wordcount.jar com.kawa.mp.WordCount /input2/ /output2/wordcount2