Spark编程模型经典解析(一)
从Hadoop MR到Spark
回顾hadoop -MR计算过程
1. 文件 通过 split,split的原因:MapReduce是并行计算的,如果一个文件很大,500G的大小,做不了分片,那如何并行运行。所以要分成很多的split,每一个split交给一个map来处理。
2. Map运行处理之后,存储在内存里面,内存满了,会刷到磁盘上形成文件。
3. 然后为了确定map处理的结果交给哪一个reduce来处理,从而有了partition,默认是hash partition。根据key计算是哪一个分区,就写到哪一个分区。到磁盘之后,形成小文件 。
4. 文件多了要进行合并(merge)
5. 最后reduce完之后,进行输出,写到HDFS,HBASE,或其他存储的地方。
Hadoop的MR的并行编程模型
Hadoop的MapReduce先将数据划分为多个key/value键值对。然后输入Map框架来得到新的key/value对,这时候只是中间结果,这个时候的value值是个值集合。再通过同步障(为了等待所有的Map处理完),这个阶段会把相同key的值收集整理(Aggregation&Shuffle)在一起,再交给Reduce框架做输出组合,如下图中每个Map输出的结果,有k1,k2,k3,通过同步障后,k1收集到一起,k2收集到一起,k3收集到一起,再分别交给Reduce,通过Reduce组合结果。
Hadoop的MR的完整编程模型和框架
下图是MapReduce的完整编程模型和框架,比模型上多加入了Combiner和Partitioner。
1)Combiner
Combiner可以理解为一个小的Reduce,就是把每个Map的结果,先做一次整合。例如图3中第三列的Map结果中有2个good,通过Combiner之后,先将本地的2个good组合到了一起(红色的(good,2))。好处是大大减少需要传输的中间结果数量量,达到网络数据传输优化,这也是Combiner的主要作用。
2)Partitioner
为了保证所有的主键相同的key值对能传输给同一个Reduce节点,如图3中所有的good传给第一个Reduce前,所有的is和has传给第二个Reduce前,所有的weather,the和today传到第三个Reduce前。MapReduce专门提供了一个Partitioner类来完成这个工作,主要目的就是消除数据传入Reduce节点后带来不必要的相关性。
MR 对比Spark
Spark编程模型
核心概念
对比一下MR里的概念
1.Application
Appliction的概念和hadoop MR中的有些相似,都是指用户编写的Spark应用程序,其中包括一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码。
2.Driver
使用Driver这一个概念的分布式框架很多,比如hive等,Spark中的Driver即运行上述Application的main函数并创建SparkContext,创建SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中有SparkContext负责与ClusterManager通信,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭,通常用SparkContext代表Driver。
SparkContext
Spark 应用程序的入口,负责调度各个运算资源,协调各个 Worker
Node 上的 Executor
某个Application运行在worker节点上的一个进程, 该进程负责运行某些Task, 并且负责将数据存到内存或磁盘上,每个Application都有各自独立的一批Executor, 在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutor Backend,类似与hadoop MR中的YarnChild。一个CoarseGrainedExecutor Backend有且仅有一个Executor对象, 负责将Task包装成taskRunner,并从线程池中抽取一个空闲线程运行Task, 这个每一个CoarseGrainedExecutor Backend能并行运行Task的数量取决与分配给它的cpu个数。
4.Cluter Manager
指的是在集群上获取资源的外部服务。目前有三种类型。
[1].Standalon : spark原生的资源管理,由Master负责资源的分配,可以在EC2上运行
[2].Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架。
[3].Hadoop Yarn: 主要是指Yarn中的ResourceManager。
5.Worker
集群中任何可以运行Application代码的节点,类似与Yarn中的NodeManager节点, 在Standalone模式中指的是通过slave文件配置的Worker节点,在Spark on Yarn模式下就是NoteManager节点。
6.Task
被送到某个Executor上的工作单元,但hadoopMR中的MapTask和ReduceTask概念一样,是运行Application的基本单位,多个Task组成一个Stage,而Task的调度和管理等是由TaskScheduler负责。
7.Job
包含多个Task组成的并行计算,往往由Spark Action触发生成, 一个Application中往往会产生多个Job。 一个Action对应的一个job
8.Stage
每个Job会被拆分成多组Task, 作为一个TaskSet, 其名称为Stage,Stage的划分和调度是有DAGScheduler来负责的,Stage有非最终的Stage(Shuffle Map Stage)和最终的Stage(Result Stage)两种,Stage的边界就是发生shuffle的地方。
9. RDD
Spark的基本计算单元,可以通过一系列算子进行计算(主要由Transformation和Action操作),同时RDD是Spark最核心的东西,他表示已被分区、被序列化的、不可变的、有容错的并且能够被并行操作的数据集合。其存储级别可以是内存,也可以是磁盘,可通过spark.storage.StoragerLevel属性来配置。
RDD里面可以有很多的partition,执行的执行的时候,一个partition对应一个map操作,并行计算,效率就会越高,越快!(资源不是瓶颈时)
RDD接口
RDD的本质特征
RDD 之partitions
默认的partition:不指定时,执行的时候,默认分配到几个CPU核的个数
RDD之preferredLocations
RDD之dependencies
宽依赖
或称为为ShuffleDependency,与Hadoop MR的Shuffle的数据依赖相似,宽依赖需要计算所有父RDD对应分区的数据,然后在节点之间进行shuffle。
窄依赖
或称为NarrowDependency, 某个具体的RDD,其分区partition a最多子Rdd中一个分区partition b依赖,此种情况只有Map任务, 是不需要发送shuffle过程的, 窄依赖又分为1:1和N:1两种。
RDD之compute
RDD之partitioner
RDD之lineage
典型RDD特征
不同角度看RDD
RDD Graph
RDD是Spark的核心结构, 可以通过一系列算子进行操作(主要有Transformation和Action操作)。当RDD遇到Action算子时,将之前的所有算子形成一个有向无环图(DAG)。再在Spark中转化为Job,提交到集群执行。一个App中可以包含多个Job
10.共享变量
在Spark Application运行期间,可能需要一些共享变量, 提供给Task或Driver使用,Spark提供了两种共享变量,一个可以缓存到各个节点的广播变量;另一种是只支持加法操作,可以实现求和的累加变量。
11.DAGScheduler
根据Job构建基于Stage的DAG,并提交Stage给TASkScheduler。 其划分Stage的依据是RDD之间的依赖的关系。
12.TASKSedulter
将TaskSET提交给worker运行,每个Executor运行什么Task就是在此处分配的。
Spark application 编程模型
:一个partition:
:一个RDD
:可以把RDD存储起来
拿wordcount的例子举例:
1. 读取数据源的时候,变成了一个RDD ,可以叫创建操作
2. 一个RDD变成了另一个RDD ,叫转换操作
3. 转换操作,不是立即执行的,是延迟调度
4. 到action操作才真正执行:例如,保存存储,count等操作才执行,叫Action操作