hadoop初步介绍:hdfs分布存储+ mr分布计算
- hdfs 和RDBMS区别
- mr 和 网格计算,志愿计算
1,数据存储
磁盘存储 | 解决分布式问题 | 硬件需求 | 系统瓶颈 | |
---|---|---|---|---|
hdfs | 磁盘阵列-集群 | 硬件故障,多数据源的数据准确性 | 普通机 | 数据传输:硬盘带宽 |
RDBMS | 单磁盘 | 专业服务器 | 磁盘寻址:大量数据更新 |
2,分析计算
适用场 | 特点 | 生态圈 | 结构特点 | 数据完整性 | 可扩展性 | 数据集结构化程度 | |
---|---|---|---|---|---|---|---|
mr | PB级数据:批处理 | 一写多读 | yarn集成其他分布式程序,hive,saprk | 读模式 | 低 | 高 | 半、非结构化 |
RDBMS | GB级数据:实时检索,更新 | 持续更新 | 写模式 | 高 | 低 | 结构化 |
3,网格计算,志愿计算
特点 | 适用场景 | |
---|---|---|
网格计算 | 分散节点计算+ 网络共享文件系统 | 小规模数据:无网络传输瓶颈 |
网格计算 | 任务单元化+ 分散计算+ 校验结果 | cup密集型:计算时间>传输时间 |
mr | 转移计算+ 数据本地化 | 作业周期短(小时计),高速局域网内,高配硬件 |
4,mr 对比linux:awk流处理
4.1,awk处理: 年度最高温度统计
4. 2,mapreduce处理:每年最高温度统计
a, ruby写mapreduce:
b, java 写mapreduce ====>idea +maven: 添加依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
map方法
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Map1 extends Mapper<LongWritable, Text, IntWritable,IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//整理的数据输入:
//1982,-8
//1931,-4
String str = value.toString();
String[] arr = str.split(",");
int year=0, tmp=Integer.MIN_VALUE;
//数据转换
try {
year= Integer.parseInt(arr[0]);
tmp= Integer.parseInt(arr[1]);
}catch (Exception e){
e.printStackTrace();
}
//输出:新数据
context.write(new IntWritable(year),new IntWritable(tmp));
}
}
reduce方法
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class Reduce1 extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//输入数据:1931,【-4,23,4,35,6】
//聚合数据: 求每组数据中的max(tmp)
int max=Integer.MIN_VALUE;
Iterator<IntWritable> it = values.iterator();
while (it.hasNext()){
IntWritable next = it.next();
int tmp = next.get();
max= (max >tmp) ? max:tmp;
}
//输出: 最高温度
context.write(key, new IntWritable(max));
}
}
app类: 调度组织job
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class App1 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(App1.class);
job.setJobName("maxTmp");
//map,reduce
job.setMapperClass(Map1.class);
job.setReducerClass(Reduce1.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//map端: 预聚合
job.setCombinerClass(Reduce1.class);
job.setNumReduceTasks(3);
//输入输出
FileInputFormat.addInputPath(job,new Path("/home/wang/txt/tmp.txt"));
//删除已存在的目录,以防报错
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path("/home/wang/tmp-out");
if(fs.exists(outPath))
fs.delete(outPath,true);
FileOutputFormat.setOutputPath(job,outPath);
//提交等待
job.waitForCompletion(true);
}
}