Spark核心知识拾遗

前言

总结了一些核心的知识点,需要的请收藏点赞。

1.Spark的架构中的基本组件
(1)ClusterManager:在Standalone模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器ResourceManager。
(2)Worker:从节点,负责控制计算节点,启动Executor或Driver。在YARN模式中为NodeManager,负责计算节点的控制。
(3)Driver:运行Application的main( )函数并创建SparkContext。
(4)Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executors。
(5)SparkContext:整个应用的上下文,控制应用的生命周期。
(6)RDD:Spark的基本计算单元,一组RDD可形成执行的有向无环图DAG RDD Graph。
(7)DAG Scheduler:根据作业Job构建基于Stage的DAG,并提交Stage给TaskScheduler。
(8)TaskScheduler:将任务Task分发给Executor执行。
(9)SaprkEnv:线程级别的上下文,存储运行时的重要组件的引用。SaprkEnv内创建并包含如下一些组件的重要引用:
MapOutPutTracker:负责Shuffer元信息的存储。
BroadcastManager:负责广播变量的控制与元信息的存储。
BlockManager:负责存储管理、创建和查找块(管理RDD的物理分区,每个Block就是节点上对应的一个数据块,可以存储在内存或者磁盘,而RDD中的partition是一个逻辑数据块,对应相应的物理块Block)
MetricsSystem: 监控运行时性能指标信息。
SparkConf: 负责存储配置信息

2.Spark的整体流程为:
Client提交应用,Master找到一个Worker启动Driver,Driver向Master或者资源管理器申请资源,之后将应用转化为RDD Graph,在由DAG Scheduler将RDD Graph转化为stage的有向无环图提交给TaskScheduler,由TaskScheduler提交任务给Executor执行。在执行任务过程中,其他组件协同工作,保证整个应用顺利执行。

3.持久化级别
调用cache()将会在executor的内存中持久化保存RDD的每个分区
(1)MEMORY_ONLY 对象在内存
(2)MEMORY_ONLY_SER把分区中的元素序列化为字节数组来实现,多了一份CPU开销,减少垃圾回收
(3)MEMORY_AND_DISK如果数 据集的大小不适合保存到内存中,就将其溢出到磁盘
(4)MEMORY_AND_DISK_SER 如果序列化数据集的大小不适合保存到内存中,就将其溢出到磁盘

4.使用Kryo序列化机制,更高效,设置conf.set ("spark. serializer”, “org.apache.spark.serializer.KryoSerializer”)

5.作业提交流程:
(1)action算子触发提交spark作业,SparkContext调用runJob( )
(2)driver调度程序DAGScheduler TaskScheduler
(3)调度程序把每个阶段中的任务提交到集群

6.YARN 客户端模式 的 driver 在客户端运行,而 YARN 集群模式的 driver 在 YARN 的 application master 集群上运行。

7.RDD的本质在代码中相当于数据的一个元数据结构,存储着数据分区及逻辑结构映射关系,存储着RDD之前的依赖转换关系。

8.RDD的重要内部属性:
(1)分区列表
(2)计算每个分片的函数
(3)对父RDD的依赖列表
(4)对Key-Value对数据类型RDD的分区器,控制分区策略和分区数
(5)每个数据分区的地址列表(如HDFS上数据块的地址)

9.Transformations算子中再将数据类型维度细分为:Value数据类型和Key-Value对数据类型的Transformations算子。Value型数据的算子封装在RDD类中可以直接使用,Key-Value对数据类型的算子封装在PairRDDFunction类中,用户需要引入import org.apache.spark.SparkContext._才能使用。

10.Spark支持重分区,数据通过Spark默认的或者用户自定义的分区器决定数据块分布在哪些节点。支持Hash分区和Range分区等分区策略。

11.glom函数将每个分区形成一个数组,内部实现是返回的GlommedRDD。

12.cache将RDD元素从磁盘缓存到内存,相当于persist( memory_only)函数功能。

13.persist函数对RDD进行缓存,数据存储在哪里由StorageLevel枚举类型确定。

14.mapValues: 针对(Key,Value)类型数据中的Value进行Map操作,而不对Key进行处理。

15.Spark通过AKKA框架进行集群消息通信。

16.Spark的容错通过血统和Checkpoint机制进行容错保证。

17.Spark应用(Application)的基本组件:
(1)Application:用户自定义的Spark程序,用户提交后,Spark为App分配资源,将程序转换并执行。
(2)Driver Program: 运行Application的main( )函数并创建SparkContext。
(3)RDD Graph: RDD是Spark的核心结构,可以通过一系列算子进行操作。
(4)Job: 一个RDD Graph触发的作业,往往由Spark Action算子触发,在SparkContext中通过runJob方法向Spark提交Job。
(5)Stage: 每个Job会根据RDD的宽依赖关系被切分很多Stage,每个Stage中包含一组相同的Task,这一组Task也叫TaskSet。
(6)Task:一个分区对应一个Task,Task执行RDD中对应Stage中包含的算子。Task被封装好后放入Executor的线程池中执行。

18.Driver进程是应用的主控进程,负责应用的解析、切分Stage并调度Task并调用Task到Executor执行,包含DAGScheduler等重要对象。

19.调度配置(静态配置资源分配规则):
(1)Standalone模式:应用使用FIFO(先进先出)的顺序进行调度,每个应用会独占所有节点的资源。配置参数 spark.cores.max决定一个应用可以在整个集群申请的CPU core数,这个参数不是控制单节点可以用多少核,如果没有配置这个参数,在该模式下,默认每个应用可以分配由参数spark.deploy.defaultCores决定的可用核数。
(2)Mesos: 静态配置资源分配策略,配置参数spark.mesos.coarse为true,将Mesos配置为粗粒度调度模式,然后配置参数spark.cores.max来限制应用可以使用的CPU core的最大限制。配置spark.executor.memory限制每个Executor的内存使用量。
(3)YARN: 配置–num-executors每个应用分配多少Executor,配置–executor-memory和–executor-cores来控制应用被分到的每个Executor的内存大小和Executor所占用的CPU核数。这样可以限制用户提交的应用不会过多占用资源,让不用的用户能够共享集群资源,提升YARN吞吐量。
注意:以上三种运行模式都不提供跨应用的共享内存。

20.在默认情况下,Spark的调度器以FIFO(先进先出)方式调度Job的执行,每个Job被切分为多个Stage,第一个Stage优先获取所有可用的资源,接下来第二个Job再获取剩余资源,以此类推。如下图所示。
Spark核心知识拾遗
21.从Spark 0.8开始,可以通过配置FAIR共享调度模式调度Job,在FAIR共享模式调度下,Spark在多Job之间以轮询方式为任务分配资源,多有的任务拥有大致相当的优先级来共享集群的资源,当一个长任务正在执行时,短任务仍可以分配到资源,提交并执行,并且获得不错的响应时间,不需要等待长任务执行完才执行,配置spark.scheduler.mode方式来让运用以FAIR模式调度。FAIR调度器支持将Job分组加入调度池中调度,可以针对不同优先级对每个调度配置不同的调度权重,这种方式允许更重要的Job配置在高优先级池中优先调度。
Spark核心知识拾遗
22.配置调度池,配置参数:
(1)schedulingMode(调度模式):可选FIFO或FAIR方式。
(2)Weight(权重): 这个参数控制在整个集群资源的分配上,这个调度池相对其他调度池的优先级高低。
(3)minShare : 这个参数代表多少个CPU核,决定整体调度的调度池能给待调度的调度池分配多少资源就可以满足调度池的资源需求,剩余的资源还可以继续分配给其他调度池。
通过conf/fairscheduler.xml文件配置调度池的属性,同时需要在程序的SparkConf对象中配置属性: conf.set( “spark.scheduler.allocation.file”, “/path/to/file” )

Spark核心知识拾遗
23.Spark通过集中方式实现进程通信,包括Actor的消息模式、Java NIO 和netty的OIO。

24.Spark可以使用Java的序列化库,也可以使用Kyro序列化库,Kyro具有紧凑、快速、轻量的优点,允许自定义序列化方法,扩展性好。

25.Spark的压缩方式
(1)Snappy:提供了更高的压缩速度,压缩算法:Zippy,压缩比达到20%~100%,经过PB级别的大数据的考验,稳定性好。
(2)LZF: 提供了更高的压缩比,压缩算法:Ning-Compress。

26.Spark配置压缩:(1)在spark-env.sh 文件中配置 export SPARK_JAVA_OPTS="-Dspark.broadcast.compress" (2)在应用程序中配置 conf.set ( “spark.broadcast.compress", true )

27.Spark通过序列化将链式分布的数据转化为连续分布的数据,这样能进行分布式的进程间数据通信,或者在内存中进行数据的压缩操作,提升Spark性能。

28.Spark通过压缩能减少数据的占用内存、IO、网络数据传输开销。

29.Spark I/O管理
(1)通信层:I/O模块也采用Master-Slave结构来实现通信层的架构,Master 和 Slave之间传输控制信息、状态信息。
(2)存储层:Spark的块数据需要存储到内存或者磁盘,有可能还需要传输到远端机器,这是由存储层完成的。

30.Spark数据写入流程
Spark核心知识拾遗
31.Spark的通信框架AKKA: 基于Scal开发,用于编写Actor应用。Actor建立一个消息队列,每次收到消息后放入队列,读取消息也是从队列中读取,这个过程是循环的。
AKKA优势:
(1)并行和分布式,采用异步通信和分布式架构
(2)可靠性:有监控恢复机制
(3)高性能:单机每秒可发送5千万个消息,1GB内存可创建保持250万个Actor对象
(4)去中心:无中心节点架构
(5)可扩展性:分布式下进行,线性扩充计算能力

32.Spark容错
(1)Checkpoint通过冗余数据来缓存数据
(2)血统通过相当粗粒度的记录更新操作来实现容错