Windows-eclipse远程开发Hadoop的MapReduce程序

本教程介绍如何在Windows系统使用eclipse开发工具进行Hadoop的MapReduce程序的开发。
如果还没有配置Hadoop的集群环境,请参考我的文章:Hadoop集群环境配置

前期准备

  1. Hadoop2.7.3.tar.gz,密码:bvyd

  2. Windows上的eclipse-Hadoop2.7.3插件,密码:65bz

环境配置

配置Hadoop环境变量: 添加 HADOOP_HOME=D:\Softwares\hadoop2.7.3 修改 PATH=;%HADOOP_HOME%\bin;%HADOOP_HOME%\sbin;
1
2
3
4
配置Hadoop环境变量:
 
添加 HADOOP_HOME=D:\Softwares\hadoop2.7.3
修改 PATH=;%HADOOP_HOME%\bin;%HADOOP_HOME%\sbin;

添加eclipse插件:
将下载的Hadoop-eclipse-plugins.jar复制到eclipse的plugins目录下,重启eclipse,可以看到左边的项目浏览器中出现了 DFS Locations选项,
打开 window->preference->hadoop MapReduce :添加Hadoop的存放位置。(将解压出来的bin目录下的文件拷贝到Hadoop的bin目录下,同时可以把hadoop.dll文件拷贝到system32目录下)Windows-eclipse远程开发Hadoop的MapReduce程序打开透视图,Map-Reduce:配置连接服务器集群:Windows-eclipse远程开发Hadoop的MapReduce程序添加新链接:Windows-eclipse远程开发Hadoop的MapReduce程序配置服务器IP和端口号:Windows-eclipse远程开发Hadoop的MapReduce程序成功连接服务器后,可以看到服务器上文件系统的文件列表(如果有的话):Windows-eclipse远程开发Hadoop的MapReduce程序

创建MapReduce项目

新建项目,选择Map/Reduce project:Windows-eclipse远程开发Hadoop的MapReduce程序新建WordCount类:Windows-eclipse远程开发Hadoop的MapReduce程序

package com.hadoop.wordcount; import java.io.IOException; import java.io.PrintStream; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private static final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { this.word.set(itr.nextToken()); context.write(this.word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } this.result.set(sum); context.write(key, this.result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; i++) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.hadoop.wordcount;
 
import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
 
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
 
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
 
public void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text,
IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
this.result.set(sum);
context.write(key, this.result);
}
}
 
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
 
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

在运行项目前,把服务器集群中的 hdfs-site.xml、core-site.xml、log4j.properties 拷贝到项目的src目录下,配置core-site.xml 文件:Windows-eclipse远程开发Hadoop的MapReduce程序接着配置项目运行参数:Windows-eclipse远程开发Hadoop的MapReduce程序注意:在服务器集群的文件系统中需要提前把需要的文件拷贝到对应的目录下,否则程序运行会无法找到输入文件路径:
在 Hadoop上运行程序, run as Hadoop:Windows-eclipse远程开发Hadoop的MapReduce程序Windows-eclipse远程开发Hadoop的MapReduce程序运行出错:Windows-eclipse远程开发Hadoop的MapReduce程序这是因为本地系统用户没有权限访问服务器的文件路径,在服务器集群的 hdfs-site.xml 中添加属性:

<property> <name>dfs.permissions</name> <value>false</value> </property>
1
2
3
4
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
//重启服务器集群 $ stop-dfs.sh $ start-dfs.sh
1
2
3
4
5
//重启服务器集群
 
$ stop-dfs.sh
 
$ start-dfs.sh

重新运行项目,执行成功,可以手动刷新本地的 dfs location 查看输出目录的文件,也可以通过 shell 查看服务器的输出:

$ hdfs dfs -cat output/*
1
$ hdfs dfs -cat output/*

注意,如果服务器集群的用户与本地用户不一样,那么会在服务器上自动创建对应的用户目录:Windows-eclipse远程开发Hadoop的MapReduce程序同样的,在执行程序时,输出结果的路径不能存在,你可以选择先进行删除或者更换输出目录,也可以通过修改代码来自动删除输出目录:Windows-eclipse远程开发Hadoop的MapReduce程序Windows-eclipse远程开发Hadoop的MapReduce程序