Hadoop Eclipse开发环境的建立
1. 安装Hadoop集群成功,下载解压版eclipse
2. 下载hadoop-eclipse-plugin-2.6.0.jar
放在eclipse安装文件夹的插件(plugins)中
下载链接:https://github.com/winghc/hadoop2x-eclipse-plugin/blob/master/release/hadoop-eclipse-plugin-2.6.0.jar
3. 重启eclipse,出现如图:多出现了Hadoop Map/Reduce
window ->preference -> hadoop Map/Reduce -> Hadoop installation directory
4. 填写本地hadoop解压的路径,这个hadoop版本是linux的,只需要解压原安装版本tar.gz包 然后拷贝到本地就可以了
5. 下载hadoop-common-2.2.0-bin,将bin下的winutils.exe复制hadoop/bin目录下
6. 配置Map/ReduceLocations
windows → show view → other->Map/Reduce Locations-> 点击“OK”
在“Map/Reduce Locations” Tab页 点击图标<大象+>或者在空白的地方右键,选择“New Hadoop location…”,弹出对话框“New hadoop location…”,进行相应配置
注意:MR Master和DFS Master配置必须和mapred-site.xml和core-site.xml等配置文件一致
7. 打开Project Explorer,查看HDFS文件系统。
8. 配置环境变量
8.1 添加HADOOP_HOME
8.2 修改JAVA_HOME
原来是C:\Program Files\Java\jdk1.8.0_144,因为路径中有空格,所以必须修改为C:\Progra~1\Java\jdk1.8.0_144
8.3 修改Path
8.4 重启windows 验证
win+R 然后输入hadoop version出现如下图则验证成功
9.使用Java API操作HDFS
9.1:使用HadoopURL
package com.kawa.hdfs; import java.io.InputStream; import java.net.URL; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; /** * 使用Hadoop URL操作HDFS * @author Administrator */ public class HadoopURICatFile { //让java虚拟机能够识别HDFS URL static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) { InputStream in = null; try { in = new URL("hdfs://192.168.74.137:9000/input2/file1.txt").openStream(); IOUtils.copyBytes(in, System.out, 4096,false); } catch (Exception e) { // TODO: handle exception }finally { IOUtils.closeStream(in); } } } |
9.2:使用Hadoop FileSystem API
package com.kawa.hdfs; import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; /** * 使用Hadoop FileSystem API进行HDFS的操作 * @author Administrator * 注意:我们在window上面开发程序操作Linux上面的HDFS,默认是以windows当前用户身份进行,因此可能是administrator也可能是其他用户, * 而这些用户在hadoop上面一般只有读取权限。如何进行增删改操作呢? * 方式一:在windows下创建一个hadoop用户 之后在这个用户下进行玩耍HDFS操作 * 方式二:对HDFS上面的文件开放权限(eg:hadoop fs -chmod 777 /input2/*) */ public class HadoopAPICatFile { public static void main(String[] args) throws IOException { //1.获取configuration对象 Configuration conf = new Configuration(); //第一种方式手动设置默认文件系统 /手动设置Configuration参数 //conf.set("fs.defaultFS", "hdfs://192.168.74.137:9000"); //第二种方式 将core-site.xml直接放到src下面 会进行自动加载 无需再进行设置configuration对象了,如果没有则自动加载core-site-default.xml //2.获取FileSystem对象 FileSystem fs = FileSystem.get(conf); //3.使用FileSystem对象操作文件 boolean flag = fs.rename(new Path("/input2/file1.txt"), new Path("/input2/a.txt")); if(flag) System.out.println("修改成功!"); else System.out.println("修改失败!"); /*InputStream in = fs.open(new Path("/input2/file2.txt")); IOUtils.copyBytes(in, System.out, 4096); IOUtils.closeStream(in);*/ } } |
10. 新建Map/Reduce任务
需要先启动Hadoop服务(start-dfs.sh/start-yarn.sh)
File->New->project->Map/Reduce Project->Next
编写WordCount类:
package com.kawa.mp; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount { //创建Mapper public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // Object key, Text value就是输入的key和value, Context记录输入的key和value StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } //创建Reducer public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //reduce函数与map函数基本相同,但value是一个迭代器的形式Iterable<IntWritable> values,也就是说reduce的输入是一个key对应一组的值的value int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); // 结果例如World, 2 } } //配置job public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count");// 指定job名称,及运行对象 job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); // 指定map函数 job.setCombinerClass(IntSumReducer.class); // combiner整合 job.setReducerClass(IntSumReducer.class);// 设定reduce函数 job.setOutputKeyClass(Text.class);// 设定输出key数据类型 job.setOutputValueClass(IntWritable.class);// 设定输出value数据类型 FileInputFormat.addInputPath(job, new Path(args[0]));// 设定输入目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
测试:导出jar包
执行命令:
./bin/hadoop jar ~/wordcount.jar com.kawa.mp.WordCount /input2/ /output2/wordcount2 |