翻开工具箱,海量数据存储处理之各种黑科技

一、前言

本文介绍大数据最常涉及的四个组件:Hadoop、Spark、Storm、Hbase。

二、Hadoop

2.1 Hadoop概要

Hadoop是一个大数据解决方案,它提供了一套分布式系统基础架构,其核心内容包含 hdfs 和mapreduce(附:hadoop2.0 引入 yarn,目前稳定可用版本已到Hadoop3.0)。

hdfs 是用于提供数据存储的,mapreduce 是用于方便数据计算的。

  1. hdfs 又对应 namenode 和 datanode,其中,namenode 负责保存元数据的基本信息,datanode 直接存放数据本身;
  2. mapreduce 对应 jobtracker 和 tasktracker,其中,jobtracker 负责分发任务,tasktracker 负责执行具体任务;
  3. 对应到 master/slave 架构,namenode 和 jobtracker 就应该对应到 master, datanode和 tasktracker 就应该对应到 slave。无论是hdfs 还是 mapreduce ,主从节点之间的都是通过heartbeat心跳来确认存活的。

2.2 第一组件:HDFS

HDFS 架构图,如下图所示:

HDFS中,NameNode 和 Secondary NameNode 是Master,所有的 Datanode 是Slave。

翻开工具箱,海量数据存储处理之各种黑科技
1、Client

Client(代表用户) 通过与 NameNode 和 DataNode 交互访问 HDFS 中 的文件。 Client 提供了一个类似 POSIX 的文件系统接口供用户调用。

2、NameNode

功能:负责管理 HDFS 的目录树和相关的文件元数据信息。

整个 Hadoop 集群中只有一个 NameNode。 它是整个系统的“ 总管”, 负责管理 HDFS 的目录树和相关的文件元数据信息。 这些信息是以“ fsimage”( HDFS 元数据镜像文件)和“editlog”(HDFS 文件改动日志)两个文件形式存放在本地磁盘,当 HDFS 重启时重新构造出来的。此外, NameNode 还负责监控各个 DataNode 的健康状态, 一旦发现某个 DataNode 宕掉,则将该 DataNode 移出 HDFS 并重新备份其上面的数据。

3、Secondary NameNode

功能:定期合并fsimage 和 edits 日志, 并传输给 NameNode。

Secondary NameNode 最重要的任务并不是为 NameNode 元数据进行热备份, 而是定期合并fsimage 和 edits 日志, 并传输给 NameNode。 这里需要注意的是,为了减小 NameNode 压力, NameNode 自己并不会合并 fsimage 和 edits, 并将文件存储到磁盘上, 而是交由Secondary NameNode 完成。

4、DataNode

功能:负责实际的数据存储, 并将数据信息定期汇报给 NameNode。

一般而言, 每个 Slave 节点上安装一个 DataNode, 它负责实际的数据存储, 并将数据信息定期汇报给 NameNode。 DataNode 以固定大小的 block 为基本单位组织文件内容, 默认情况下block 大小为 64MB。 当用户上传一个大的文件到 HDFS 上时, 该文件会被切分成若干个 block,分别存储到不同的 DataNode ; 同时,为了保证数据可靠, 会将同一个 block 以流水线方式写到若干个(默认是 3,该参数可配置)不同的 DataNode 上。 这种文件切割后存储的过程是对用户透明的。

2.3 第二组件:MapReduce

Hadoop MapReduce 采用了 Master/Slave(M/S)架构,具体如图所示。

Map Reduce中,JobTracker 是Master,所有的 TaskTracker 是Slave。

翻开工具箱,海量数据存储处理之各种黑科技

MapReduce主要由以下几个组件组成:Client、JobTracker、TaskTracker 和 Task。 下面分别对这几个组件进行介绍。

1、Client

用户编写的 MapReduce 程序通过 Client 提交到 JobTracker 端; 同时, 用户可通过 Client 提供的一些接口查看作业运行状态。

在 Hadoop 内部用“作业”(Job) 表示 MapReduce 程序。一个 MapReduce 程序可对应若干个作业(Job),而每个作业(Job)会被分解成若干个 Map/Reduce 任务(Task)。

2、JobTracker

JobTracker 主要负责资源监控和作业调度。JobTracker 监控所有 TaskTracker 与作业的健康状况,一旦发现失败情况后,其会将相应的任务转移到其他节点;同时 JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。

在 Hadoop 中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。

3、TaskTracker

TaskTracker 会周期性地通过 Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给 JobTracker, 同时接收 JobTracker 发送过来的命令并执行相应的操作(如启动新任务、 杀死任务等)。

TaskTracker 使用“slot” 等量划分本节点上的资源量。“slot” 代表计算资源(CPU、内存等)。一个 任务Task 获取到一个 slot 后才有机会运行,而 Hadoop 调度器的作用就是将各个TaskTracker 上的空闲 slot 分配给 Task 使用。 slot 分为 Map slot 和 Reduce slot 两种,分别供MapTask 和 Reduce Task 使用。

这样一来,TaskTracker 通过 slot 数目(可配置参数)限定 Task 的并发度。

4、Task

Task 分为 Map Task 和 Reduce Task 两种, 均由 TaskTracker 启动。

相对于 HDFS 以固定大小的 block 为基本单位存储数据, MapReduce 的其处理单位是 split。split 是一个逻辑概念, 它只包含一些元数据信息, 比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。

但需要注意的是,split 的多少决定了 Map Task 的数目 ,因为每个 split 会交由一个 Map Task 处理。

5、Map Task 执行过程

Map Task 执行过程如图所示:

翻开工具箱,海量数据存储处理之各种黑科技

由该图可知,Map Task 先将对应的 split 迭代解析成一个个key/value 对,依次调用用户自定义的 map() 函数进行处理,然后临时数据被分成若干个 partition(每个 partition 将被一个 Reduce Task 处理),最终将临时结果存放到本地磁盘上。

6、Reduce Task 执行过程

上面的Map Task执行流程图中,中间一个步骤是临时数据被分成若干个 partition,其实,每个 partition 将被一个 Reduce Task 处理,那么 Reduce Task 是如何处理的呢?

Reduce Task 执行过程分为三个阶段:

  1. 从远程节点上读取 MapTask 中间结果(称为“Shuffle 阶段”);
  2. 按照 key 对 key/value 对进行排序(称为“ Sort 阶段”);
  3. 依次读取<key, value list>,调用用户自定义的 reduce() 函数处理,并将最终结果存到 HDFS 上(称为“ Reduce 阶段”)。

2.4 Hadoop MapReduce作业的生命周期

步骤一:作业提交与初始化

作业提交:用户提交作业(即Job)后, 首先由 JobClient 实例将作业相关信息, 比如将程序 jar 包、作业配置文件、 分片元信息文件等上传到分布式文件系统( 一般为 HDFS)上,其中,分片元信息文件记录了每个输入分片的逻辑位置信息。 然后 JobClient 通过 RPC (即Romote Procedure Call,远程过程调用)通知 JobTracker。

作业初始化:JobTracker 收到新作业提交请求后, 由作业调度模块对作业进行初始化:为作业创建一个 JobInProgress 对象以跟踪作业运行状况, 而 JobInProgress 则会为每个 Task 创建一个 TaskInProgress 对象以跟踪每个任务的运行状态, TaskInProgress 可能需要管理多个“ Task 运行尝试”( 称为“ Task Attempt”)。

步骤二:任务调度与监控

由于任务调度和监控的功能均由 JobTracker 完成,TaskTracker 周期性地通过Heartbeat 向 JobTracker 汇报本节点的资源使用情况, 一旦出现空闲资源, JobTracker会按照一定的策略选择一个合适的任务使用该空闲资源, 这由任务调度器完成。 任务调度器是一个可插拔的独立模块, 且为双层架构, 即首先选择作业(即Job), 然后从该作业(Job)中选择任务(即Task), 其中,选择任务时需要重点考虑数据本地性。

此外,JobTracker 跟踪作业的整个运行过程,并为作业的成功运行提供全方位的保障。 首先, 当 TaskTracker 或者 Task 失败时, 转移计算任务 ; 其次, 当某个 Task 执行进度远落后于同一作业的其他 Task 时,为之启动一个相同Task, 并选取计算快的 Task 结果作为最终结果。

步骤三:任务运行环境准备

运行环境准备包括 JVM 启动和资源隔离,均由 TaskTracker 实现。

JVM启动:TaskTracker 为每个Task 启动一个独立的 JVM 以避免不同 Task 在运行过程中相互影响 ;
资源隔离:TaskTracker 使用了操作系统进程实现资源隔离以防止 Task 滥用资源。

步骤四:任务执行

TaskTracker 为 Task 准备好运行环境后, 便会启动 Task。 在运行过程中, 每个 Task 的最新进度首先由 Task 通过 RPC 汇报给 TaskTracker, 再由 TaskTracker 汇报给 JobTracker。

步骤五:作业完成

待所有 Task 执行完毕后, 整个作业(即Job)执行成功。

2.5 YARN

YARN 是一个资源管理、任务调度的框架,主要包含三大模块ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。其中,ResourceManager 负责所有资源的监控、分配和管理;NodeManager 负责每一个节点的维护;ApplicationMaster 负责每一个具体应用程序的调度和协调。

对于所有的 applications,RM 拥有绝对的控制权和对资源的分配权。而每个 AM 则会和 RM 协商资源,同时和 NodeManager 通信来执行和监控 task。

几个模块之间的关系如图所示。

翻开工具箱,海量数据存储处理之各种黑科技

对于上图的解释:
图中左边两个Client客户实例请求,一个红色一个蓝色。
第一条线MapReduce Status表示传递当前的MapReduce的状态,每个应用程序都包含一个主节点AM,由于图中表示两个应用程序正在被处理,三个红色的Container容器都将自己正在处理的MapReduce的状态传递给红色的AM应用程序主节点,蓝色的Container容器也是将当前的MapReduce的状态传递给蓝色的AM应用程序主节点。
第二条线Job Submission表示作业提交,就是指左边两个Client将作业提交给RM。
第三条线Node Status表示节点状态,NodeManager 负责每一个节点的维护,三个NodeManager将节点状态信息提交给RM全局资源管理器。
第四条线Resource Request表示做出资源请求动作,图中是两个AM为自己的应用程序对RM做出资源请求。

问题1:AM和NM是否存在包含、并列或其他关系?
回答1:AM和NM不存在任何关系,AM对应的是应用程序,一个应用程序包含一个AM,NM对应的是Yarn内部节点,一个内部节点需要一个NM来维护。
问题2:AM与RM的关系?NM与RM的关系?
回答2:因为AM对应的是一个应用程序,所以自然会向RM发出Resource Request资源请求,为应用程序请求资源;因为NM对应的是一个内部节点,所以自然会向RM汇报Node Status节点状态信息。

ResourceManager 模块

  1. RM含义:ResourceManager 负责整个集群的资源管理和分配,是一个全局的资源管理系统。
  2. RM与NM:NodeManager 以心跳的方式向 ResourceManager 汇报资源使用情况(目前主要是 CPU 和内存的使用情况)。RM 只接受 NM 的资源回报信息,对于具体的资源处理则交给 NM 自己处理。
  3. RM与AM:YARN Scheduler 根据 application 的请求为其分配资源,不负责 application job 的监控、追踪、运行状态反馈、启动等工作。

NodeManager 模块

  1. NM含义:NodeManager 是每个节点上的资源和任务管理器,它是管理这台机器的代理,负责该节点程序的运行,以及该节点资源的管理和监控。YARN集群每个节点都运行一个NodeManager。
  2. NM与RM:NodeManager 定时向 ResourceManager 汇报本节点资源(CPU、内存)的使用情况和Container 的运行状态。当 ResourceManager 宕机时 NodeManager 自动连接 RM 备用节点。
  3. NM与AM:NodeManager 接收并处理来自 ApplicationMaster 的 Container 启动、停止等各种请求。

ApplicationMaster 模块

  1. AM含义:用户提交的每个应用程序均包含一个ApplicationMaster,它可以运行在ResourceManager以外的机器上。
  2. AM与RM:负责与 RM 调度器协商以获取资源(用 Container 表示)。
  3. AM与NM:与 NM 通信以启动/停止任务。
  4. AM与任务Task:将得到的任务进一步分配给内部的任务(资源的二次分配)。
  5. AM与任务Task:监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。

注:RM 只负责监控 AM,并在 AM 运行失败时候启动它。RM 不负责 AM 内部任务的容错,任务的容错由 AM 完成。

2.6 YARN运行流程

YARN运行流程

  1. client 向 RM 提交应用程序,其中包括启动该应用的 ApplicationMaster 的必须信息,例如ApplicationMaster 程序、启动 ApplicationMaster 的命令、用户程序等。
  2. ResourceManager 启动一个 container 用于运行 ApplicationMaster。
  3. 启动中的ApplicationMaster向ResourceManager注册自己,启动成功后与RM保持心跳。
  4. ApplicationMaster 向 ResourceManager 发送请求,申请相应数目的 container。
  5. ResourceManager 返回 ApplicationMaster 的申请的 containers 信息。申请成功的
    container,由 ApplicationMaster 进行初始化。container 的启动信息初始化后,AM 与对应的 NodeManager 通信,要求 NM 启动 container。AM 与 NM 保持心跳,从而对 NM 上运行的任务进行监控和管理。
  6. container 运行期间,ApplicationMaster 对 container 进行监控。container 通过 RPC 协议向对应的 AM 汇报自己的进度和状态等信息。
  7. 应用运行期间,client 直接与 AM 通信获取应用的状态、进度更新等信息。
  8. 应用运行结束后,ApplicationMaster 向 ResourceManager 注销自己,并允许属于它的container 被收回。

2.7 Hadoop2与Hadoop3的区别

翻开工具箱,海量数据存储处理之各种黑科技

hadoop2.X 与 hadoop3.X 的 22 条区别,如下:

1.License

adoop 2.x - Apache 2.0,开源
Hadoop 3.x - Apache 2.0,开源

2.支持的最低Java版本

Hadoop 2.x - java的最低支持版本是java 7
Hadoop 3.x - java的最低支持版本是java 8

3.容错

Hadoop 2.x - 可以通过复制(浪费空间)来处理容错。
Hadoop 3.x - 可以通过Erasure编码处理容错。

4.数据平衡

Hadoop 2.x - 对于数据,平衡使用HDFS平衡器。
Hadoop 3.x - 对于数据,平衡使用Intra-data节点平衡器,该平衡器通过HDFS磁盘平衡器CLI调用。

5.存储Scheme

Hadoop 2.x - 使用3X副本Scheme
Hadoop 3.x - 支持HDFS中的擦除编码。

6.存储开销

Hadoop 2.x - HDFS在存储空间中有200%的开销。
Hadoop 3.x - 存储开销仅为50%。

7.存储开销示例

Hadoop 2.x - 如果有6个块,那么由于副本方案(Scheme),将有18个块占用空间。
Hadoop 3.x - 如果有6个块,那么将有9个块空间,6块block,3块用于奇偶校验。

8.YARN时间线服务

Hadoop 2.x - 使用具有可伸缩性问题的旧时间轴服务。
Hadoop 3.x - 改进时间线服务v2并提高时间线服务的可扩展性和可靠性。

9.默认端口范围

Hadoop 2.x - 在Hadoop 2.0中,一些默认端口是Linux临时端口范围。所以在启动时,他们将无法绑定。
Hadoop 3.x - 但是在Hadoop 3.0中,这些端口已经移出了短暂的范围。

10.工具

Hadoop 2.x - 使用Hive,pig,Tez,Hama,Giraph和其他Hadoop工具。
Hadoop 3.x - 可以使用Hive,pig,Tez,Hama,Giraph和其他Hadoop工具。

11.兼容的文件系统

Hadoop 2.x - HDFS(默认FS),FTP文件系统:它将所有数据存储在可远程访问的FTP服务器上。 Amazon S3(简单存储服务)文件系统Windows Azure存储Blob(WASB)文件系统。
Hadoop 3.x - 它支持所有前面以及Microsoft Azure Data Lake文件系统。

12.Datanode资源

Hadoop 2.x - Datanode资源不专用于MapReduce,我们可以将它用于其他应用程序。
Hadoop 3.x - 此处数据节点资源也可用于其他应用程序。

13.MR API兼容性

Hadoop 2.x - 与Hadoop 1.x程序兼容的MR API,可在Hadoop 2.X上执行
Hadoop 3.x - 此处,MR API与运行Hadoop 1.x程序兼容,以便在Hadoop 3.X上执行

14.支持Microsoft Windows

Hadoop 2.x - 它可以部署在Windows上。
Hadoop 3.x - 它也支持Microsoft Windows。

15.插槽/容器

Hadoop 2.x - Hadoop 1适用于插槽的概念,但Hadoop 2.X适用于容器的概念。通过容器,我们可以运行通用任务。
Hadoop 3.x - 它也适用于容器的概念。

16.单点故障

Hadoop 2.x - 具有SPOF的功能,因此只要Namenode失败,它就会自动恢复。
Hadoop 3.x - 具有SPOF的功能,因此只要Namenode失败,它就会自动恢复,无需人工干预就可以克服它。

17.HDFS联盟

Hadoop 2.x - 在Hadoop 1.0中,只有一个NameNode来管理所有Namespace,但在Hadoop 2.0中,多个NameNode用于多个Namespace。
Hadoop 3.x - Hadoop 3.x还有多个名称空间用于多个名称空间。

18.可扩展性

Hadoop 2.x - 我们可以扩展到每个群集10,000个节点。
Hadoop 3.x - 更好的可扩展性。 我们可以为每个群集扩展超过10,000个节点。

19.更快地访问数据

Hadoop 2.x - 由于数据节点缓存,我们可以快速访问数据。
Hadoop 3.x - 这里也通过Datanode缓存我们可以快速访问数据。

20.HDFS快照

Hadoop 2.x - Hadoop 2增加了对快照的支持。 它为用户错误提供灾难恢复和保护。
Hadoop 3.x - Hadoop 2也支持快照功能。

21.平台

Hadoop 2.x - 可以作为各种数据分析的平台,可以运行事件处理,流媒体和实时操作。
Hadoop 3.x - 这里也可以在YARN的顶部运行事件处理,流媒体和实时操作。

22.群集资源管理

Hadoop 2.x - 对于群集资源管理,它使用YARN。 它提高了可扩展性,高可用性,多租户。
Hadoop 3.x - 对于集群,资源管理使用具有所有功能的YARN。

三、Spark

3.1 Spark架构

Spark 提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求。

Spark 核心架构,如下图所示:
翻开工具箱,海量数据存储处理之各种黑科技
对于上图中重要概念解释:

Spark Core(即上图中第二层Apache Spark)
Spark Core 中包含 Spark 的基本功能,尤其是定义 RDD 的 API、操作以及这两者上的动作,其他 Spark 的库都是构建在 RDD 和 Spark Core 之上的。

注意:RDD,Resilient Distributed DataSet,即弹性分布式数据集。

Spark SQL
提供 HiveQL(一种 Apache Hive 的 SQL 变体 Hive 查询语言)与 Spark 进行交互的 API。每个数据库表被当做一个 RDD,Spark SQL 查询被转换为 Spark 操作。

Spark Streaming
对实时数据流进行处理和控制。Spark Streaming 允许程序能够像普通 RDD 一样处理实时数据。

Mllib(Machine Learning)
一个常用机器学习算法库,算法被实现为对 RDD 的 Spark 操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。

GraphX
控制图、并行图操作和计算的一组算法和工具的集合。GraphX 扩展了 RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。

3.2 Spark核心组件

Spark 组件交互,如下图所示:

翻开工具箱,海量数据存储处理之各种黑科技
对于上图中重要概念解释:

Cluster Manager
在 standalone 模式中即为 Master 主节点,控制整个集群,监控 worker。
在 YARN 模式中为集群管理器。

Worker Node
作为从节点,负责控制计算节点,启动 Executor 或者 Driver。

Driver
运行 Application 的 main() 函数。

Executor
执行器,是为某个 Application 运行在 worker node 上的一个进程。

3.3 Spark编程模型

Spark编程模型,如下图所示:
翻开工具箱,海量数据存储处理之各种黑科技

Spark 应用程序从编写到提交、执行、输出的整个过程如图所示,图中描述的步骤如下:

  1. 用户使用SparkContext提供的API(常用的有textFile、sequenceFile、runJob、stop等)编写 Driver application 程序。此外 SQLContext、HiveContext 及 StreamingContext 对SparkContext 进行封装,并提供了 SQL、Hive 及流式计算相关的 API。
  2. 使用SparkContext提交的用户应用程序,首先会使用BlockManager和BroadcastManager 将任务的 Hadoop 配置进行广播。然后由 DAGScheduler 将任务转换为 RDD 并组织成 DAG,DAG 还将被划分为不同的 Stage。最后由 TaskScheduler 借助 ActorSystem 将任务提交给集群管理器(Cluster Manager)。
  3. 集群管理器(ClusterManager)给任务分配资源,即将具体任务分配到Worker上,Worker创建 Executor 来处理任务的运行,运行结果保存到Store存储中。Standalone、YARN、Mesos、EC2 等都可以作为 Spark的集群管理器;HDFS、Amzon、S3、Tachyon等都可以作为Store存储。

3.4 Spark计算模型

Spark计算模型运作图,如下所示:
翻开工具箱,海量数据存储处理之各种黑科技

上图中,RDD 英文全称 Resiliennt Distributed Datasets,译为 弹性分布式数据集 ,可以看做是对各种数据计算模型的统一抽象,Spark 的计算过程主要是 RDD 的迭代计算过程。RDD 的迭代计算过程非常类似于管道,即上图中每一个分区(从分区1到分区N)可以看做是一个管道。分区数量取决于 partition 数量的设定,每个分区的数据只会在一个 Task 中计算。所有分区可以在多个机器节点的 Executor 上并行执行。

3.5 Spark运行流程

Spark运行流程图,如下图所示:
翻开工具箱,海量数据存储处理之各种黑科技
对于上图的解释:

  1. 构建 Spark Application 的运行环境,启动 SparkContext。
  2. SparkContext 向资源管理器(可以是 Standalone、Mesos 、Yarn )申请运行Executor 资源,并启动 StandaloneExecutorbackend 。
  3. Executor 向 SparkContext 申请 Task 。
  4. SparkContext 将应用程序分发给 Executor。
  5. SparkContext 构建成 DAG 图,将 DAG 图分解成 Stage 、将 Taskset 发送给 Task Scheduler ,最后由 Task Scheduler 将 Task 发送给 Executor 运行。
  6. Task 在 Executor 上运行,运行完释放所有资源。

3.6 Spark RDD流程

Spark RDD流程图,如下所示:
翻开工具箱,海量数据存储处理之各种黑科技
对于上图的解释:

  1. 创建 RDD 对象。
  2. DAGScheduler 模块介入运算,计算 RDD 之间的依赖关系,RDD 之间的依赖关系就形成了DAG。
  3. 每一个 Job 被分为多个 Stage。划分 Stage 的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个 Stage,避免多个 Stage 之间的消息传递开销。

3.7 Spark RDD

(1 )RDD 的创建方式

1)从Hadoop文件系统(或与Hadoop兼容的其他持久化存储系统,如Hive、Cassandra、HBase)输入(例如 HDFS)创建。
2)从父 RDD 转换得到新 RDD。
3)通过 parallelize 或 makeRDD 将单机数据创建为分布式 RDD。

(2 )RDD 的两种操作算子 ( 转换(Transformation)与行动(Action) )

对于 RDD 可以有两种操作算子:转换(Transformation)与行动(Action)。

1) 转换(Transformation):Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
翻开工具箱,海量数据存储处理之各种黑科技

2)行动(Action):Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark 系统。
翻开工具箱,海量数据存储处理之各种黑科技

四、Storm

4.1 Storm概要

Storm 是一个免费并开源的分布式实时计算系统。利用 Storm 可以很容易做到可靠地处理无限的数据流,像 Hadoop 批量处理大数据一样,Storm 可以实时处理数据。

Storm集群架构,如下图所示:

翻开工具箱,海量数据存储处理之各种黑科技
对于上图的解释:

1、Nimbus (作为master,职能:将代码分发给Supervisor)
Storm 集群的 Master 节点,负责分发用户代码,指派给具体的 Supervisor 节点上的 Worker 节点,去运行 Topology 对应的组件(Spout/Bolt)的 Task。

2、Supervisor (作为slave,职能:管理 Worker进程的启动和终止 )
Storm 集群的从节点,负责管理运行在 Supervisor 节点上的每一个 Worker 进程的启动和终止。通过 Storm 的配置文件中的 supervisor.slots.ports 配置项,可以指定在一个 Supervisor 上最大允许多少个 Slot,每个 Slot 通过端口号来唯一标识,一个端口号对应一个 Worker 进程(如果该Worker 进程被启动)。

3、Worker ( 具体处理组件逻辑的进程 )
运行具体处理组件逻辑的进程。Worker 运行的任务类型只有两种,一种是 Spout 任务,一种是Bolt 任务。

4、Task
worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,不同 spout/bolt 的 task 可能会共享一个物理线程,该线程称为 executor。

5、ZooKeeper
用来协调 Nimbus 和 Supervisor,如果 Supervisor 因故障出现问题而无法运行 Topology,Nimbus 会第一时间感知到,并重新分配 Topology 到其它可用的 Supervisor 上运行。

4.2 Storm编程模型(spout->tuple->bolt)

strom 在运行中可分为 spout 与 bolt 两个组件,其中,数据源从 spout 开始,数据以 tuple 的方式发送到 bolt,多个 bolt 可以串连起来,一个 bolt 也可以接入多个 spot/bolt。

整个运行时原理如下图:

翻开工具箱,海量数据存储处理之各种黑科技
对上图主要概念的解释:

1、Topology
Storm 中运行的一个实时应用程序的名称。将 Spout、 Bolt 整合起来的拓扑图。定义了 Spout 和Bolt 的结合关系、并发数量、配置等等。

2、Spout
在一个 topology 中获取源数据流的组件。通常情况下 spout 会从外部数据源中读取数据,然后转换为 topology 内部的源数据。

3、Bolt
接受数据然后执行处理的组件,用户可以在其中执行自己想要的操作。

4、Tuple
一次消息传递的基本单元,理解为一组消息就是一个 Tuple。

5、Stream
Tuple 的集合。表示数据的流向。

4.3 Storm Topolog运行

在 Storm 中,一个实时应用的计算任务被打包作为 Topology 发布,这同 Hadoop MapReduce 任务相似。但是有一点不同的是:在 Hadoop 中,MapReduce 任务最终会执行完成后结束;而在Storm 中,Topology 任务一旦提交后永远不会结束,除非你显示去停止任务。计算任务Topology 是由不同的 Spouts 和 Bolts,通过数据流(Stream)连接起来的图。一个 Storm 在集群上运行一个 Topology 时,主要通过以下 3 个实体来完成 Topology 的执行工作:

(1). Worker (进程)
(2). Executor (线程)
(3). Task
翻开工具箱,海量数据存储处理之各种黑科技

1、Worker( 1 个 worker 进程执行的是 1 个 topology 的子集 )
1 个 worker 进程执行的是 1 个 topology 的子集(注:不会出现 1 个 worker 为多个 topology服务)。1 个 worker 进程会启动 1 个或多个 executor 线程来执行 1 个 topology 的component(spout 或 bolt)。因此,1 个运行中的 topology 就是由集群中多台物理机上的多个worker 进程组成的。

2、Executor( executor 是 1 个被 worker 进程启动的单独线程 )
Executor 是 1 个被 worker 进程启动的单独线程。每个 executor 只会运行 1 个 topology 的 1 个component(spout 或 bolt)的 task(注:task 可以是 1 个或多个,storm 默认是 1 个component 只生成 1 个 task,executor 线程里会在每次循环里顺序调用所有 task 实例)。

3、Task( 最终运行 spout 或 bolt 中代码的单元 )
Task是最终运行 spout 或 bolt 中代码的单元(注:1 个 task 即为 spout 或 bolt 的 1 个实例,executor 线程在执行期间会调用该 task 的 nextTuple 或 execute 方法)。topology 启动后,1个 component(spout 或 bolt)的 task 数目是固定不变的,但该 component 使用的 executor 线程数可以动态调整(例如:1 个 executor 线程可以执行该 component 的 1 个或多个 task 实例)。这意味着,对于 1 个 component 存在这样的条件#threads<=#tasks(即:线程数小于等于 task 数目)。默认情况下 task 的数目等于 executor 线程数目,即 1 个 executor 线程只运行 1 个 task。

翻开工具箱,海量数据存储处理之各种黑科技

4.4 Storm Streaming Grouping

Stream grouping 是 Storm 中最重要的抽象了,它能够控制 Spot/Bolt 对应的 Task 以什么样的方式来分发 Tuple,将 Tuple 发射到目的 Spot/Bolt 对应的 Task。
翻开工具箱,海量数据存储处理之各种黑科技
目前,Storm Streaming Grouping 支持如下几种类型:

1、huffle Grouping
随机分组,尽量均匀分布到下游 Bolt 中将流分组定义为混排。这种混排分组意味着来自 Spout 的输入将混排,或随机分发给此 Bolt 中的任务。shuffle grouping 对各个 task 的 tuple 分配的比较均匀。

2、Fields Grouping
按字段分组,按数据中 field 值进行分组;相同 field 值的 Tuple 被发送到相同的 Task 这种grouping 机制保证相同 field 值的 tuple 会去同一个 task。

3、All grouping :广播
广播发送, 对于每一个 tuple 将会复制到每一个 bolt 中处理。

4、Global grouping
全局分组,Tuple 被分配到一个 Bolt 中的一个 Task,实现事务性的 Topology。Stream 中的所有的 tuple 都会发送给同一个 bolt 任务处理,所有的 tuple 将会发送给拥有最小 task_id 的 bolt任务处理。

5、None grouping :不分组
不关注并行处理负载均衡策略时使用该方式,目前等同于 shuffle grouping,另外 storm 将会把bolt 任务和他的上游提供数据的任务安排在同一个线程下。

6、Direct grouping :直接分组 指定分组
由 tuple 的发射单元直接决定 tuple 将发射给那个 bolt,一般情况下是由接收 tuple 的 bolt 决定接收哪个 bolt 发射的 Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个 task 处理这个消息。 只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息 tuple 必须使用 emitDirect 方法来发射。消息处理者可以通过TopologyContext 来获取处理它的消息的 taskid (OutputCollector.emit 方法也会返回taskid)。

五、Hbase

5.1 Hbase概要

5.1.1 Hbase 是一个通过大量廉价的机器解决海量数据的高速存储和读取的分布式数据库解决方案

Hbase 是分布式、面向列的开源数据库(其实准确的说是面向列族)。HDFS 为 Hbase 提供可靠的底层数据存储服务,MapReduce 为 Hbase 提供高性能的计算能力,Zookeeper 为 Hbase 提供稳定服务和 Failover 机制,因此我们说 Hbase 是一个通过大量廉价的机器解决海量数据的高速存储和读取的分布式数据库解决方案。

5.1.2 Hbase的列式存储

Hbase采用列式存储,列方式所带来的重要好处之一就是,由于查询中的选择规则是通过列来定义的,因此整个数据库是自动索引化的。
翻开工具箱,海量数据存储处理之各种黑科技

这里的列式存储其实说的是列族存储,Hbase 是根据列族来存储数据的。列族下面可以有非常多的列,列族在创建表的时候就必须指定。为了加深对 Hbase 列族的理解,下面是一个简单的关系型数据库的表和 Hbase 数据库的表:

翻开工具箱,海量数据存储处理之各种黑科技
翻开工具箱,海量数据存储处理之各种黑科技

5.1.3 Hbase的核心概念

1、Column Family 列族

Column Family 又叫列族,Hbase 通过列族划分数据的存储,列族下面可以包含任意多的列,实现灵活的数据存取。Hbase 表的创建的时候就必须指定列族。就像关系型数据库创建的时候必须指定具体的列是一样的。Hbase的列族不是越多越好,官方推荐的是列族最好小于或者等于3。我们使用的场景一般是 1 个列族。

2、Rowkey( Rowkey 查询,Rowkey 范围扫描,全表扫描 )

Rowkey 的概念和 mysql 中的主键是完全一样的,Hbase 使用 Rowkey 来唯一的区分某一行的数据。Hbase 只支持 3 中查询方式:基于 Rowkey 的单行查询,基于 Rowkey 的范围扫描,全表扫描。

3、Region 分区

Region:Region 的概念和关系型数据库的分区或者分片差不多。Hbase 会将一个大表的数据基于 Rowkey 的不同范围分配到不通的 Region 中,每个 Region 负责一定范围的数据访问和存储。这样即使是一张巨大的表,由于被切割到不通的 region,访问起来的时延也很低。

4、TimeStamp 多版本

TimeStamp 是实现 Hbase 多版本的关键。在 Hbase 中使用不同的 timestame 来标识相同 rowkey 行对应的不通版本的数据。在写入数据的时候,如果用户没有指定对应的timestamp,Hbase 会自动添加一个 timestamp,timestamp 和服务器时间保持一致。在
Hbase 中,相同 rowkey 的数据按照 timestamp 倒序排列。默认查询的是最新的版本,用户可同指定 timestamp 的值来读取旧版本的数据。

5.2 Hbase核心架构

Hbase 是由 Client、Zookeeper、HMaster、HRegionServer、HDFS 等几个组建组成。

Hbase 架构如下图所示:

翻开工具箱,海量数据存储处理之各种黑科技

1、Hbase组件:Client

Client 包含了访问 Hbase 的接口,另外 Client 还维护了对应的 cache 来加速 Hbase 的
访问,比如 cache 的.META.元数据的信息。

2、Hbase组件:Zookeeper

Hbase 通过 Zookeeper 来做 master 的高可用、RegionServer 的监控、元数据的入口
以及集群配置的维护等工作。具体工作如下:

  1. 通过 Zoopkeeper 来保证集群中只有 1 个 master 在运行,如果 master 异
    常,会通过竞争机制产生新的 master 提供服务
  2. 通过 Zoopkeeper 来监控 RegionServer 的状态,当 RegionSevrer 有异常的
    时候,通过回调的形式通知 Master RegionServer 上下限的信息
  3. 通过 Zoopkeeper 存储元数据的统一入口地址。

3、Hbase组件:HMaster

HMaster 节点的主要职责如下:

  1. 为 RegionServer 分配 Region
  2. 维护整个集群的负载均衡
  3. 维护集群的元数据信息发现失效的 Region,并将失效的 Region 分配到正常RegionServer 上当 RegionSever 失效的时候,协调对应 Hlog 的拆分。

4、Hbase组件:HRegionServer

HRegionServer 直接对接用户的读写请求,是真正的“干活”的节点。它的功能概括如下:

  1. 管理 master 为其分配的 Region
  2. 处理来自客户端的读写请求
  3. 负责和底层 HDFS 的交互,存储数据到 HDFS
  4. 负责 Region 变大以后的拆分
  5. 负责 Storefile 的合并工作

5、Hbase组件:HDFS

HDFS 为 Hbase 提供最终的底层数据存储服务,同时为 Hbase 提供高可用(Hlog 存储在
HDFS)的支持。

6、Region 寻址方式(通过 zookeeper .META)

Region 寻址方式,如下图所示:
翻开工具箱,海量数据存储处理之各种黑科技

第 1 步:Client 请求 ZK 获取.META.所在的 RegionServer 的地址。
第 2 步:Client 请求.META.所在的 RegionServer 获取访问数据所在的 RegionServer 地
址,client 会将.META.的相关信息 cache 下来,以便下一次快速访问。
第 3 步:Client 请求数据所在的 RegionServer,获取所需要的数据。

5.3 Hbase写入流程

Hbase 写入流程,如下图所示:

翻开工具箱,海量数据存储处理之各种黑科技

从上图可以看出氛围 3 步骤:

步骤1:获取 RegionServer
Client 获取数据写入的 Region 所在的 RegionServer。

步骤2:请求写 Hlog
请求写 Hlog, Hlog 存储在 HDFS,当 RegionServer 出现异常,需要使用 Hlog 来恢复数据。

步骤3:请求写 MemStore
请求写 MemStore,只有当写 Hlog 和写 MemStore 都成功了才算请求写入完成。MemStore 后续会逐渐刷到 HDFS 中。

5.4 Hbase写入时触发MemStore刷盘的场景

为了提高 Hbase 的写入性能,当写请求写入 MemStore 后,不会立即刷盘。而是会等到一定的时候进行刷盘的操作,总结成如下的几个场景:

1、全局内存控制
这个全局的参数是控制内存整体的使用情况,当所有 memstore 占整个 heap 的最大比
例的时候,会触发刷盘的操作。这个参数是hbase.regionserver.global.memstore.upperLimit,默认为整个 heap 内存的 40%。但这并不意味着全局内存触发的刷盘操作会将所有的 MemStore 都进行输盘,而是通过另外一个参数 hbase.regionserver.global.memstore.lowerLimit 来控制,默认是整个heap 内存的 35%。当 flush 到所有 memstore 占整个 heap 内存的比率为 35%的时候,就停止刷盘。这么做主要是为了减少刷盘对业务带来的影响,实现平滑系统负载的目的。

2、MemStore 达到上限
当 MemStore 的大小达到 hbase.hregion.memstore.flush.size 大小的时候会触发刷
盘,默认 128M 大小。

3、RegionServer 的 Hlog 数量达到上限
前面说到 Hlog 为了保证 Hbase 数据的一致性,那么如果 Hlog 太多的话,会导致故障恢复的时间太长,因此 Hbase 会对 Hlog 的最大个数做限制。当达到 Hlog 的最大个数的时候,会强制刷盘。这个参数是 hase.regionserver.max.logs,默认是 32 个。

4、手工触发
可以通过 hbase shell 或者 java api 手工触发 flush 的操作。

5、关闭 RegionServer 触发
在正常关闭 RegionServer 会触发刷盘的操作,全部数据刷盘后就不需要再使用 Hlog 恢
复数据。

6、Region 使用 HLOG 恢复完数据后触发
当 RegionServer 出现故障的时候,其上面的 Region 会迁移到其他正常的RegionServer 上,在恢复完 Region 的数据后,会触发刷盘,当刷盘完成后才会提供给业务访问。

5.5 Hbase 与 Cassandra 区别

一表小结(HBase vs Cassandra):

HBase Cassandra
语言 Java Java
出发点 BigTable BigTable and Dynamo
License Apache Apache
Protocol HTTP/REST (also Thrift) Custom, binary (Thrift)
数据分布 表划分为多个 region 存在不同 region server 上 改进的一致性哈希(虚拟节点)
存储目标 大文件 小文件
一致性 强一致性 最终一致性,Quorum NRW 策略
架构 master/slave p2p
高可用性 NameNode 是 HDFS 的单点故障点 P2P 和去中心化设计,不会出现单点故障
伸缩性 Region Server 扩容,通过将自身发布到Master,Master 均匀分布 Region 扩容需在 Hash Ring 上多个节点间调整数据分布
读写性能 数据读写定位可能要通过最多 6 次的网络 RPC,性能较低。 数据读写定位非常快
数据冲突处理 乐观并发控制(optimistic concurrencycontrol) 向量时钟
临时故障处理 Region Server 宕机,重做 HLog 数据回传机制:某节点宕机,hash 到该节点的新数据自动路由到下一节点做 hinted handoff,源节点恢复后,推送回源节点。
永久故障恢复 Region Server 恢复,master 重新给其分配 region Merkle 哈希树,通过 Gossip 协议同步 Merkle Tree,维护集群节点间的数据一致性
成员通信及错误检测 Zookeeper 基于 Gossip
CAP 1,强一致性,0 数据丢失。2,可用性低。3,扩容方便。 1,弱一致性,数据可能丢失。2,可用性高。3,扩容方便。

六、小结

翻开工具箱,海量数据存储处理之各种黑科技,完成了。

天天打码,天天进步!!!