Spark的任务调度
文章目录
1、学习任务调度前需要了解的知识点
1.1、Spark中的一些专业术语
1.1.1、任务相关
Application:用户写的应用程序(DriverProgram + ExecutorProgram)。
Job:一个action类算子触发的操作。
stage:一组任务,例如:map task。
task:(thread)在集群运行时,最小的执行单元。
1.1.2、资源相关
Mstaer:资源管理主节点。
Worker:资源管理从节点。
Executor:执行任务的进程。
ThreadPool:线程池(存在于Executor进程中)
1.2、RDD中的依赖关系
1.2.1、宽依赖
父RDD与子RDD,partition之间的关系是一对多,一般来说,宽依赖都会导致shuffle。(默认情况下,groupByKey返回的RDD的分区数与父RDD是一致的。如果你在使用groupByKey的时候,传入一个Int类型的值,那么分区数就是这个值。)
1.2.2、窄依赖
父RDD与子RDD,partition之间的依赖关系是一对一,这种依赖关系不会有shuffle。
1.2.3、宽窄依赖的作用
宽窄依赖的作用就是:把job切割成一个个的stage。
切割stage的过程:(stage与 stage之间是宽依赖,stage内部是窄依赖)
那么接下来问题来了,为什么我们需要把job切割成stage呢?
答:把job切割成stage之后,stage内部就可以很容易的划分出一个个的task任务(用一条线把task内部有关联的子RDD与父RDD串联起来),然后就可把task放到管道中运行了。
下一个问题:RDD存储的到底是什么样的计算逻辑呢?下面用一个例子来解释:
在这个Application中有一个job,一个stage,2个task。
task0:这条线贯穿所有的partition中的计算逻辑,并且以递归函数展开式的形式整合到一起,fun2(fun1(textFile(b1))),最好将这个计算逻辑发送到b1或者其副本所在节点。task1也是相同的逻辑。同时注意:task的计算模式是pipeline的计算模式。
1.3、需要了解的几个问题
1.3.1、stage中的每一个task(管道计算模式)会在什么时候落地磁盘?
1)、如果stage后面是跟的是action类算子
saveAsText:将每一个管道计算结果写入到指定目录。
collect:将每一个管道计算结果拉回到Driver端内存中。
count:将每一个管道计算结果,统计记录数,返回给Driver。
2)、如果stage后面是跟的是stage
在shuffle write阶段会写磁盘。(为什么在shuffle write阶段写入磁盘?防止reduce task拉取文件失败,拉取失败后可以直接在磁盘再次拉取shuffle后的数据)
1.3.2、Spark在计算的过程中,是不是特别消耗内存?
不是。Spark是在管道中计算的,而管道中不是特别耗内存。即使有很多管道同时进行,也不是特别耗内存。
1.3.3、什么样的场景最耗内存?
使用控制类算子的时候耗内存,特别是使用cache时最耗内存。
1.3.4、如果管道中有cache逻辑,他是如何缓存数据的?
有cache时,会在一个task运行成功时(遇到action类算子时),将这个task的运行结果缓存到内存中。
1.3.5、RDD(弹性分布式数据集),为什么他不存储数据还叫数据集?
虽然RDD不具备存储数据的能力,但是他具备操作数据的能力。
2、任务调度
2.1、任务调度的流程
1)、DAGScheduler:根据RDD的宽窄依赖关系将DAG有向无环图切割成一个个的stage,将stage封装给另一个对象taskSet,taskSet=stage,然后将一个个的taskSet给taskScheduler。
2)、taskScheduler:taskSeheduler拿倒taskSet之后,会遍历这个taskSet,拿到每一个task,然后去调用HDFS上的方法,获取数据的位置,根据获得的数据位置分发task到响应的Worker节点的Executor进程中的线程池中执行。
3)、taskSchedule:taskSchedule节点会跟踪每一个task的执行情况,若执行失败,TaskSche会尝试重新提交,默认会重试提交三次,如果重试三次依然失败,那么这个task所在的stage失败,此时TaskSchedule向DAGSchedule做汇报。
4)、DAGScheduler:接收到stage失败的请求后,,此时DAGSheduler会重新提交这个失败的stage,已经成功的stage不会重复提交,只会重试这个失败的stage。
(注:如果DAGScheduler重试了四次依然失败,那么这个job就失败了,job不会重试)
2.2、配置信息使用的三种方式
1)、在代码中使用SparkConf来配置。
2)、在提交的时候使用 --conf来配置。
spark-submit --master --conf k=v 如果要设置多个配置信息的值,需要使用多个–conf
3)、在spark的配置文件spark-default.conf中配置。
2.3、什么是挣扎(掉队)的任务?
当所有的task中,75%以上的task都运行成功了,就会每隔一百秒计算一次,计算出目前所有未成功任务执行时间的中位数*1.5,凡是比这个时间长的task都是挣扎的task。
2.4、关于任务调度的几个小问题
2.4.1、如果有1T数据,单机运行需要30分钟,但是使用Saprk计算需要两个小时(4node),为什么?
1)、发生了计算倾斜。大量数据给少量的task计算。少量数据却分配了大量的task。
2)、开启了推测执行机制。
2.4.2、对于ETL(数据清洗流程)类型的数据,开启推测执行、重试机制,对于最终的执行结果会不会有影响?
有影响,最终数据库中会有重复数据。
解决方案:
1)、关闭各种推测、重试机制。
2)、设置一张事务表。
3、Spark更多内容
如果想了解Spark更多内容,推荐阅读《Spark学习总结》。