hadoop/hbase快速开发环境使用
-------------------- 本文介绍hadoop/hbase的开发环境搭建
hadoop介绍
Hadoop框架中最核心的设计就是:MapReduce和HDFS。MapReduce的思想是由Google的一篇论文所提及而被广为流传的,简单的一句话解释MapReduce就是“任务的分解与结果的汇总”。HDFS是Hadoop分布式文件系统(Hadoop Distributed File System)的缩写,为分布式计算存储提供了底层支持。
hdfs 分布式文件系统
图1 hdfs分布式文件系统结构
从上图可以看到hdfs这个分布式文件系统;由一个namenode作为索引,存放每个数据节点(DataNode)上存放的数据块(blocks)的索引;DataNode心跳向namenode报告自己存放的blocks的索引;hadoop通过在DataNode节点间复制备份多份数据来保证一个节点挂掉后,提供了很高的可用性;默认配置复制的块数为3;即在自己DataNode上一份,在相同机架上另一个DataNode上一份,在不同机架上的一个DataNode上一份; NameNode保存一份block都在那几个DataNode上的索引数据;
任务分派:
图2 :hadoop 的并行任务结构
从上图看出:hadoop的并行任务通过JobTracker来统一管理所有节点上的TaskTracker; JobTracker来负责调度,负载均衡健康检查,监控各个任务的执行情况;jobtracker 心跳向taskTracker汇报任务运行情况;
Map/Reduce
任务运行时:本地DataNode上的数据根据Key为簇,依次输入map程序进行运算;运算结果经过排序合并输入reducer程序进行汇总计算操作;
为什么使用hadoop
官方网站已经给了很多的说明,这里就大致说一下其优点及使用的场景(没有不好的工具,只用不适用的工具,因此选择好场景才能够真正发挥分布式计算的作用):
- 可扩展:不论是存储的可扩展还是计算的可扩展都是Hadoop的设计根本。
- 经济:框架可以运行在任何普通的PC上。
- 可靠:分布式文件系统的备份恢复机制以及MapReduce的任务监控保证了分布式处理的可靠性。
- 高效:分布式文件系统的高效数据交互实现以及MapReduce结合Local Data处理的模式,为高效处理海量的信息作了基础准备。
使用场景:个人觉得最适合的就是海量数据的分析,其实Google最早提出MapReduce也就是为了海量数据分析。同时HDFS最早是为了搜索引擎实现而开发的,后来才被用于分布式计算框架中。海量数据被分割于多个节点,然后由每一个节点并行计算,将得出的结果归并到输出。同时第一阶段的输出又可以作为下一阶段计算的输入,因此可以想象到一个树状结构的分布式计算图,在不同阶段都有不同产出,同时并行和串行结合的计算也可以很好地在分布式集群的资源下得以高效的处理。
hadoop/hbase搭建(开发环境我们只搭建一个虚拟并行平台:单台机器运行)
下载hadoop: http://hadoop.apache.org/ ; 下载,解压;
下载hbase:http://hbase.apache.org/ 下载;解压;
配置hadoop: 在hadoop/conf下
配置hadoop-env.sh
新增环境变量:
export JAVA_HOME=/usr/java
export HADOOP_HOME=/home/zhaopeng/workspace/software/hadoop-0.20.2
export PATH=$PATH:$HADOOP_HOME/bin
配置hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
配置core-site.xml
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/tmp/hadoop/hadoop-${user.name}</value>
<description>A base for other temporary directories.</description>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
配置hbase:(hbase/conf)
hbase-site.xml
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9000/hbase</value>
<description>The directory shared by region servers.
</description>
</property>
</configuration>
并将hbase/lib/hbase-0.20.6.jar和zookeeper-3.2.2.jar 和 conf/hbase-default.xml和conf/hbase-site.xml都复制到hadoop/conf下;(因为任务运行默认加载conf/下的文件到classpath)
运行hadoop/hbase:
1. hadoop/bin/hadoop namenode -format
2. hadoop/bin/start-all.sh
3.hbase/bin/start-hbase.sh
hadoop常用命令
hadoop dfs -ls :查看hdfs文件系统文件列表
hadoop dfs -cat output/* :查看output目录下所有文件内容
hadoop job -list :列出所有正在执行的任务信息
hadoop job -status jobid :列出jobid这个任务的详细信息;
hadoop jar taskClassName : 执行一个任务;
hadoop dfs -du :查看hdfs的文件大小;
hadoop job -kill jobid :杀掉一个job;
hbase常用命令
hbase shell :打开hbase 数据库的shell客户端
list :查看所有表
create 'tableName','columnName1','columenName3' :新建表;并新建2个column
put 'tableName','rowId','columnName1:1','value1' :在表里插入一条记录
scan 'tableName; 全表select
get 'tableName','rowId' :查找指定rowId的记录
hadoop/hbase 程序
- public class OfferMapper extends TableMapper<Text, IntWritable> {
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
- /**
- * rowkey,value,context
- */
- public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException,
- InterruptedException {
- String titleValue = Bytes.toString(value.getValue(Bytes.toBytes("title"), null));
- for(KeyValue kv : value.list()){
- System.out.println(kv.getValue().toString());
- word.set(kv.getValue());
- context.write(word, one);
- }
- }
- }
- public class OfferReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
- private IntWritable result = new IntWritable();
- public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
- InterruptedException {
- int sum = 0;
- for (IntWritable val : values) {
- sum += val.get();
- }
- result.set(sum);
- context.write(key, result);
- }
- }
- public class OfferTask {
- final static String NAME = "OfferTask";
- /**
- * Sets up the actual job.
- *
- * @param conf The current configuration.
- * @param args The command line parameters.
- * @return The newly created job.
- * @throws IOException When setting up the job fails.
- */
- public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
- String tableName = args[0];
- // Path inputDir = new Path(args[1]);
- Job job = new Job(conf, NAME + "_" + tableName);
- job.setJarByClass(OfferTask.class);
- Scan myScan = new Scan();
- TableMapReduceUtil.initTableMapperJob(tableName, myScan, OfferMapper.class, Text.class,
- IntWritable.class, job);
- TableMapReduceUtil.initTableReducerJob(tableName, OfferReducer.class, job);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- return job;
- }
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println("Usage: OfferTask <tableName> <out>");
- System.exit(2);
- }
- Job job = createSubmittableJob(conf, otherArgs);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }