hadoop初步介绍:hdfs分布存储+ mr分布计算

  1. hdfs 和RDBMS区别
  2. mr 和 网格计算,志愿计算

1,数据存储

磁盘存储 解决分布式问题 硬件需求 系统瓶颈
hdfs 磁盘阵列-集群 硬件故障,多数据源的数据准确性 普通机 数据传输:硬盘带宽
RDBMS 单磁盘 专业服务器 磁盘寻址:大量数据更新

2,分析计算

适用场 特点 生态圈 结构特点 数据完整性 可扩展性 数据集结构化程度
mr PB级数据:批处理 一写多读 yarn集成其他分布式程序,hive,saprk 读模式 半、非结构化
RDBMS GB级数据:实时检索,更新 持续更新 写模式 结构化

3,网格计算,志愿计算

特点 适用场景
网格计算 分散节点计算+ 网络共享文件系统 小规模数据:无网络传输瓶颈
网格计算 任务单元化+ 分散计算+ 校验结果 cup密集型:计算时间>传输时间
mr 转移计算+ 数据本地化 作业周期短(小时计),高速局域网内,高配硬件

4,mr 对比linux:awk流处理

4.1,awk处理: 年度最高温度统计

hadoop初步介绍:hdfs分布存储+ mr分布计算

4. 2,mapreduce处理:每年最高温度统计

hadoop初步介绍:hdfs分布存储+ mr分布计算

a, ruby写mapreduce:

hadoop初步介绍:hdfs分布存储+ mr分布计算

b, java 写mapreduce ====>idea +maven: 添加依赖

	<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>

map方法

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class Map1 extends Mapper<LongWritable, Text, IntWritable,IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //整理的数据输入:
        //1982,-8
        //1931,-4
        String str = value.toString();
        String[] arr = str.split(",");
        int year=0, tmp=Integer.MIN_VALUE;

        //数据转换
        try {
             year= Integer.parseInt(arr[0]);
             tmp= Integer.parseInt(arr[1]);
        }catch (Exception e){
            e.printStackTrace();
        }
        //输出:新数据
        context.write(new IntWritable(year),new IntWritable(tmp));
    }
}

reduce方法

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;

public class Reduce1 extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
    @Override
    protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //输入数据:1931,【-4,23,4,35,6】
        //聚合数据: 求每组数据中的max(tmp)
        int max=Integer.MIN_VALUE;
        Iterator<IntWritable> it = values.iterator();
        while (it.hasNext()){
            IntWritable next = it.next();
            int tmp = next.get();

            max= (max >tmp) ? max:tmp;
        }
        //输出: 最高温度
        context.write(key, new IntWritable(max));
    }
}

app类: 调度组织job

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class App1 {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(App1.class);
        job.setJobName("maxTmp");

        //map,reduce
        job.setMapperClass(Map1.class);
        job.setReducerClass(Reduce1.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        //map端: 预聚合
        job.setCombinerClass(Reduce1.class);
        job.setNumReduceTasks(3);
        //输入输出
        FileInputFormat.addInputPath(job,new Path("/home/wang/txt/tmp.txt"));
        //删除已存在的目录,以防报错
        FileSystem fs = FileSystem.get(conf);
        Path outPath = new Path("/home/wang/tmp-out");
        if(fs.exists(outPath))
            fs.delete(outPath,true);
        FileOutputFormat.setOutputPath(job,outPath);

        //提交等待
        job.waitForCompletion(true);
    }
}