第十七天 -- IDEA -- MAVEN -- AWK -- MapReduce简单案例
第十七天 – IDEA – MAVEN – AWK – MapReduce简单案例
文章目录
一、IDEA
安装
下载地址 点击进入
分为旗舰版和社区版,旗舰版需要付费,可以**,社区版完全免费,它们在一些功能支持上有所差异。推荐使用旗舰版。
下载后直接一路下一步安装即可,可以修改安装路径等配置。
**
旗舰版安装完成后需要**,**过程 点击进入 按照该网页提示的过程即可**。
简单配置
设置位置:File – Settings
-
字体大小、颜色等:Editor – Font / Color Scheme
-
代码提示:Editor – General – Code Completion,建议选择首字母匹配
-
编码设置:Editor – File Encodings,将下图三处改为UTF-8
-
修改jdk版本:Build,Execution,Deployment – Compiler – Java Compiler
-
项目中修改Modules的jdk:File – Project Structure,选中Modules
常用快捷键
- Ctrl+Shift + Enter,语句补全
- Ctrl+Alt+L,格式化代码(与qq快捷键冲突)
- Ctrl+Enter,导入包,自动修正
- Ctrl+Alt+T,把代码包在一个块内,例如:if、if/else、try/catch等
- Ctrl+Alt+V,引入变量。例如:输入new ArrayList(); 会自动补全为ArrayList<Object> objects = new ArrayList<>();
- Ctrl+X,删除当前行
- Ctrl+D,复制当前行到下一行
- Ctrl+/或Ctrl+Shift+/,注释(//或者/**/)
- Shift+Enter,向下插入新行
- Ctrl+O,重写方法
- Ctrl+I,实现方法
二、Maven
简介
Apache Maven是一个软件项目管理和综合工具。基于项目对象模型(POM)的概念,Maven可以从一个中心资料片管理项目构建,报告和文件。
下载
推荐使用3.3.9版本,通过官网下载 apache-maven-3.3.9-bin.tar.gz
解压安装
直接使用解压缩软件解压至某个安装目录下,如D:\software\apache-maven-3.3.9
配置环境变量
右键此电脑 – 属性 – 高级系统设置 – 环境变量
- 在系统变量中新建MVN_HOME,值D:\software\apache-maven-3.3.9
- 编辑Path变量,新增%MVN_HOME%\bin
测试
打开cmd,输入mvn -version,出现以下信息,代表安装、配置环境变量成功
配置maven本地仓库
打开maven安装目录comf文件夹下的settings.xml,默认配置是将本地仓库存放在该用户家目录的.m2文件夹下的repository下,但考虑到用户目录一般在C盘,下载的jar包多了之后占用系统盘空间,所以自行配置将存放位置配置到其他盘中
D:\m2\repository
Idea中配置Maven
File – Settings --Build,Execution,Deployment – Maven
配置Maven home directory为本地安装目录(idea自带两个maven,但是不利于进行配置,所以使用自己安装的maven)
配置User settings file,将Override打上勾,选择自己的配置文件settings.xml,选中后Local repository会自动变为settings.xml上一步配置的本地仓库目录
**注意:**settings只是设置当前项目,需要选择other settings --> default settings(或者是settings for new projects)再设置一遍。这样新建项目后设置项也不会变回默认了
Maven依赖查询
三、IDEA项目
新建Maven项目
File – New – Project,选择Maven,选择sdk版本,可以New本机jdk安装目录
pom.xml
新建maven项目时,会自动生成pom.xml,此文件中可以配置依赖、打包选项等功能。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.szbd</groupId>
<artifactId>sz_hadoop</artifactId>
<version>1.0</version>
<!--配置项目所依赖的jar包-->
<dependencies>
</dependencies>
</project>
配置jar包依赖,在maven依赖查询网址中查到对应的配置项后,复制粘贴到标签中
四、awk命令
简介
awk其名称得自于它的创始人 Alfred Aho 、Peter Weinberger 和 Brian Kernighan 姓氏的首个字母。实际上 AWK 的确拥有自己的语言: AWK 程序设计语言 , 三位创建者已将它正式定义为“样式扫描和处理语言”。它允许您创建简短的程序,这些程序读取输入文件、为数据排序、处理数据、对输入执行计算以及生成报表,还有无数其他的功能。
awk基本命令格式
数据源|awk -F “” ‘BEGIN{} {}… END{}’
awk -F “” ‘BEGIN{} {}… END{}’ 数据源
awk -F “” ‘{}… END{}’ 数据源
awk -F “” '{}… ’ 数据源
BEGIN{}类似于MapReduce中的setup()函数,在{}之前执行且仅执行一次
END{}类似于MapReduce中的clean()函数,在{}之后执行且仅执行一次
awk案例
-
过滤(获得用户中用户组id大于500的信息)
cat /etc/passwd | awk -F “:” ‘{if($3>500)print $1,$2,$3}’
-
分段统计、百分比
数据
a 300 200 300
b 800 900 200
c 500 800 900
d 900 900 300命令
cat ./pc | awk -F “\t” ’
BEGIN{
ge0lt1000=0
ge1000lt1500=0
ge1500lt2000=0
ge2000=0
print “zone\tcount\tpercent”
}{
if(($2+$3+$4) < 1000){
ge0lt1000++
} else if(($2+$3+$4) < 1500){
ge1000lt1500++
} else if(($2+$3+$4) < 2000){
ge1500lt2000++
} else {
ge2000++
}
}END{
print “ge0lt1000”,ge0lt1000,ge0lt1000/NR100.0"%"
print “ge1000lt1500”,ge1000lt1500,ge1000lt1500/NR100.0"%"
print “ge1500lt2000”,ge1500lt2000,ge1500lt2000/NR100.0"%"
print “ge2000”,ge2000,ge2000/NR100.0"%"
}
’结果
zone count percent
ge0lt1000 1 25%
ge1000lt1500 0 0%
ge1500lt2000 1 25%
ge2000 2 50%
五、简单的MapReduce案例
解析json字符串
需求:
输入数据:
{"add":"183.160.122.237|20171030080000015","ods":{"hed":"20171030075958865|011001103233576|60427fbe7d66,60427f8b733b|NewTV02SkyworthNews|192.168.1.6|4|6|60000185|3","dt":"20171030075958865|4|7,3404304,19458158,0,1,60000,0,2430|"}}
输出:
183.160.122.237|20171030080000015 20171030075958865|011001103233576|60427fbe7d66,60427f8b733b|NewTV02SkyworthNews|192.168.1.6|4|6|60000185|3 20171030075958865|4|7,3404304,19458158,0,1,60000,0,2430|
代码:
import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Map;
public class JSONanalysis{
public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
public static Text k = new Text();
public static FloatWritable v = new FloatWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Map<String, Object> map = JSONObject.parseObject(value.toString(), Map.class);
Map<String, String> ods = JSONObject.parseObject(map.get("ods").toString(), Map.class);
String result = map.get("add") + "\t" + ods.get("hed") + "\t" + ods.get("dt");
context.write(new Text(result), NullWritable.get());
}
}
public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
/*@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, null);
}*/
}
// 驱动
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取配置对象信息
Configuration conf = new Configuration();
// 2.对conf进行设置(没有就不用)
conf.set("fs.defaultFS", "hdfs://sz01:8020/");
// 3.获取job对象
Job job = Job.getInstance(conf, "jsonanalysis");
// 4.设置job的运行主类
job.setJarByClass(JSONanalysis.class);
// 5.对map阶段进行设置
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
setArgs(job, args);
// 7.提交job并打印信息
int isok = job.waitForCompletion(true) ? 0 : 1;
// 退出整个job
System.exit(isok);
}
/**
* 作业参数处理
* @param job
* @param args
*/
public static void setArgs(Job job , String[] args){
try {
if(args.length != 2){
System.out.println("argments size is not enough!!!");
System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata");
}
//设置输入文件路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//判断输出目录是否存在
FileSystem fs = FileSystem.get(job.getConfiguration());
System.out.println(fs);
Path op = new Path(args[1]);
if(fs.exists(op)){
fs.delete(op, true);
}
//设置输出数据目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
} catch (Exception e) {
e.printStackTrace();
}
}
}
多文件输出
需求
数据:
hello qianfeng qianfeng world heloo
Hello Hi Hello World
QF QQ
163.com
15900001111 17900001111
@163.com
@189.com
$1100000
*[a-z]
输出:
part-r-00000
hello 1
heloo 1
qianfeng 2
world 1
part-r-00001
Hello 2
Hi 1
QF 1
QQ 1
part-r-00002
163.com 1
15900001111 1
17900001111 1
part-r-00003
@163.com 1
@189.com 1
$1100000 1
*[a-z] 1
代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Partition {
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String s : words){
context.write(new Text(s), new Text(1 + ""));
}
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(Text t : values){
count += Integer.parseInt(t.toString());
}
context.write(key, new Text(count + ""));
}
}
public static class MyPartitioner extends Partitioner<Text, Text>{
@Override
public int getPartition(Text key, Text value, int numPartitions) {
String firstChar = key.toString().substring(0, 1);
if(firstChar.matches("^[a-z]")) {
return 0;
}else if(firstChar.matches("^[A-Z]")) {
return 1;
}else if(firstChar.matches("^[0-9]")) {
return 2;
}else {
return 3;
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取配置对象信息
Configuration conf = new Configuration();
// 2.对conf进行设置(没有就不用)
conf.set("fs.defaultFS", "hdfs://sz01:8020/");
// 3.获取job对象
Job job = Job.getInstance(conf, "jsonanalysis");
// 4.设置job的运行主类
job.setJarByClass(Partition.class);
// 5.对map阶段进行设置
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置分区
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(4);
// 6.对reduce阶段设置
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
setArgs(job, args);
// 7.提交job并打印信息
int isok = job.waitForCompletion(true) ? 0 : 1;
// 退出整个job
System.exit(isok);
}
/**
* 作业参数处理
* @param job
* @param args
*/
public static void setArgs(Job job , String[] args){
try {
if(args.length != 2){
System.out.println("argments size is not enough!!!");
System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata");
}
//设置输入文件路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//判断输出目录是否存在
FileSystem fs = FileSystem.get(job.getConfiguration());
Path op = new Path(args[1]);
if(fs.exists(op)){
fs.delete(op, true);
}
//设置输出数据目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
} catch (Exception e) {
e.printStackTrace();
}
}
}
倒序索引
需求:
数据:
index.html
hadoop hadoop hadoop is nice is good hadoop hadoop
hadoop-info.html
hadoop hadoop hadoop is better
spark-info.html
spark spark spark hadoop is nice nice nice
输出数据:
better hadoop-info.html:1
good hadoop-info.html:1
hadoop index.html:5;hadoop-info.html:3;spark-info.html:1
....
spark spark-info.html:3
代码:
DescIndex.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class DescIndex {
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
InputSplit is = context.getInputSplit();
String fileName = ((FileSplit)is).getPath().getName();
String line = value.toString();
String[] words = line.split(" ");
for (String s : words) {
context.write(new Text(s + "_" + fileName), new Text(1 + ""));
}
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String str = "";
for (Text t : values) {
str += t.toString() + ";";
}
context.write(key, new Text(str.substring(0, str.length() - 1)));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取配置对象信息
Configuration conf = new Configuration();
// 2.对conf进行设置(没有就不用)
conf.set("fs.defaultFS", "hdfs://sz01:8020/");
// 3.获取job对象
Job job = Job.getInstance(conf, "descindex");
// 4.设置job的运行主类
job.setJarByClass(DescIndex.class);
// 5.对map阶段进行设置
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置combiner
job.setCombinerClass(MyCombiner.class);
// 6.对reduce阶段设置
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
setArgs(job, args);
// 7.提交job并打印信息
int isok = job.waitForCompletion(true) ? 0 : 1;
// 退出整个job
System.exit(isok);
}
/**
* 作业参数处理
* @param job
* @param args
*/
public static void setArgs(Job job , String[] args){
try {
if(args.length != 2){
System.out.println("argments size is not enough!!!");
System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata");
}
//设置输入文件路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//判断输出目录是否存在
FileSystem fs = FileSystem.get(job.getConfiguration());
Path op = new Path(args[1]);
if(fs.exists(op)){
fs.delete(op, true);
}
//设置输出数据目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
} catch (Exception e) {
e.printStackTrace();
}
}
}
MyCombiner.java:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;
public class MyCombiner extends Reducer<Text, Text, Text, Text> {
Map<Text, Integer> reduceMap = new HashMap<>();
ValueComparator bvc = new ValueComparator(reduceMap);
TreeMap<Text, Integer> sorted_map = new TreeMap<>(bvc);
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String[] str = key.toString().split("_");
int counter = 0;
for (Text t : values) {
counter += Integer.parseInt(t.toString());
}
//context.write(new Text(str[0]), new Text(str[1] + ":" + counter));
sorted_map.put(new Text(str[0] + "_" + str[1]), counter);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
System.out.println(reduceMap);
for(Text in : reduceMap.keySet()){
context.write(new Text(in.toString().split("_")[0]), new Text(in.toString().split("_")[1] + reduceMap.get(in)));
}
}
}
去重
需求:
2017-11-28 北京-天津
2017-11-29 北京-天津
2017-11-27 北京-天津
2017-11-27 北京-天津
2017-11-28 北京-天津
2017-11-26 北京-天津
2017-11-26 北京-哈尔滨
2017-11-29 北京-天津
2017-11-26 北京-三亚
输出:
2017-11-28 北京-天津
2017-11-29 北京-天津
2017-11-27 北京-天津
2017-11-26 北京-天津
2017-11-26 北京-哈尔滨
2017-11-26 北京-三亚
代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 去重
*/
public class Distinct {
public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
context.write(new Text(line),NullWritable.get());
}
}
public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
// 驱动
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取配置对象信息
Configuration conf = new Configuration();
// 2.对conf进行设置(没有就不用)
conf.set("fs.defaultFS", "hdfs://sz01:8020/");
// 3.获取job对象
Job job = Job.getInstance(conf, "distinct");
// 4.设置job的运行主类
job.setJarByClass(Distinct.class);
// 5.对map阶段进行设置
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 6.对reduce阶段设置
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
setArgs(job, args);
// 7.提交job并打印信息
int isok = job.waitForCompletion(true) ? 0 : 1;
// 退出整个job
System.exit(isok);
}
/**
* 作业参数处理
* @param job
* @param args
*/
public static void setArgs(Job job , String[] args){
try {
if(args.length != 2){
System.out.println("argments size is not enough!!!");
System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata");
}
//设置输入文件路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//判断输出目录是否存在
FileSystem fs = FileSystem.get(job.getConfiguration());
System.out.println(fs);
Path op = new Path(args[1]);
if(fs.exists(op)){
fs.delete(op, true);
}
//设置输出数据目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
} catch (Exception e) {
e.printStackTrace();
}
}
}