Hadoop 入门 之 实践之wordcount
Hadoop 实践之wordcount
新建Maven工程
groupId: com.hmzhou.hadoop
Artifactid:wordcount
pom.xml
-- 1
<packaging>jar</packaging>
-- 2
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
Mapper 编写
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// 注意泛型的对应关系
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 此处使用空格分割
String[] strings = value.toString().split(" ");
for (String s : strings) {
context.write(new Text(s), new IntWritable(1));
}
}
}
Reducer编写
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable v : values) {
count += v.get();
}
context.write(key, new IntWritable(count));
}
}
Wordcount main方法编写
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
rmr(configuration, otherArgs[otherArgs.length - 1]);
Job job = Job.getInstance(configuration, "WordsCount");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
if (job.waitForCompletion(true)) {
cat(configuration, otherArgs[1] + "/part-r-00000");
System.out.println("success");
} else {
System.out.println("fail");
}
}
private static boolean rmr(Configuration configuration, String dirPath) throws IOException {
boolean delResult = false;
Path targetPath = new Path(dirPath);
FileSystem fs = targetPath.getFileSystem(configuration);
if (fs.exists(targetPath)) {
delResult = fs.delete(targetPath, true);
if (delResult) {
System.out.println(targetPath + " has bean deleted successful.");
} else {
System.out.println(targetPath + " deletion failed.");
}
}
return delResult;
}
private static void cat(Configuration configuration, String filePath) throws IOException {
InputStream inputStream = null;
Path file = new Path(filePath);
FileSystem fileSystem = file.getFileSystem(configuration);
try {
inputStream = fileSystem.open(file);
org.apache.hadoop.io.IOUtils.copyBytes(inputStream, System.out, 4096, true);
} finally {
if (inputStream != null) {
org.apache.hadoop.io.IOUtils.closeStream(inputStream);
}
}
}
}
配置执行参数
注意需要将对应的输入文件 wordcount.txt 推送到hdfs
hdfs -put wordcount.txt /wordcount/words/input/