MapReduce与Spark On Yarn资源竞争时,为何Spark总能抢占更多资源?

问题简述:我们日常开发中,有时会在yarn队列中发现一个现象,spark任务跑得很舒服,而且占了很多资源,MR任务拿不到资源一直卡在那里,提交MR的同事就很气,抱怨spark抢占资源,实际真的是这样么?我们针对这个问题进行以下探究。常见现象如下,资源紧张时比较常见,看如图中的Spark任务持有的Container普遍多于MR任务持有的Container

MapReduce与Spark On Yarn资源竞争时,为何Spark总能抢占更多资源?

有的小伙伴可能没有耐心,咱们先给下结论!!!(文章结尾还有提升竞争力优化思路)

不是的,并不是Spark抢占资源,而是MR和Spark模型决定的。

MapReduce 多进程模型

    MapReduce 应用程序是让每个Task 动态申请资源,且运行完后马上释放资源,这就会有一个问题,Task就一直重复着申请资源,释放资源。

特点:

  • 每个Task 运行在一个独立的JVM 进程中

  • 可单独为不同类型的Task 设置不同的资源量,目前支持内存和CPU 两种资源

  • 每个Task 运行完后,将释放所占用的资源,这些资源不能被其他Task 复用,即使是同一个作业相同类型的Task。也就是说,每个Task 都要经历“申请资源—> 运行Task –> 释放资源”的过程

MapReduce与Spark On Yarn资源竞争时,为何Spark总能抢占更多资源?     

 

Spark 多线程模型

    构建一个可重用的资源池,然后在这个资源池里运行所有的ShuffleMapTask 和ReduceTask

特点:

  • 每个节点上可以运行一个或多个Executor 服务

  • 每个Executor 配有一定数量的slot,表示该Executor 中可以同时运行多少个ShuffleMapTask 或者ReduceTask

  • 每个Executor 单独运行在一个JVM 进程中,每个Task 则是运行在Executor中的一个线程

  • 同一个 Executor 内部的 Task 可共享内存, 广播的文件或者数据结构只会在每个Executor 中加载一次,而不会像MapReduce 那样,每个Task 加载一次

  • Executor 一旦启动后,将一直运行,且它的资源可以一直被Task 复用, 直到Spark 程序运行完成后才释放退出

MapReduce与Spark On Yarn资源竞争时,为何Spark总能抢占更多资源?

以上已经分别把MR和Spark的模型和特点都写出来了,聪明的小伙伴应该已经猜出来了,为什么MR在资源抢占方面不如Spark,原因就是MR重复申请资源,而Spark抢占资源以后可以重复使用,当Yarn队列中有空余Container时,MR与Spark同时竞争,多次竞争以后自然而然就是Spark持有的Container更多

下面难度稍有增加!!!

有小伙伴又疑问了?Spark与MR竞争时权重到底是不是一致的呢,说实话我也不知道,没有找到类似源码来为我佐证,但是可以分析Spark的Executor与MR的Task启动条件,不管是Executor也好还是Task也好,启动都是需要一定内存的,不同的集群配置不太一样,如需了解更多可以自己去查看Spark或MR参数配置,这里不再赘述,

情况一 单个Executor与Task设置都是4G,队列剩余资源>=4G,Executor与Task都有可能起来,但是两者只能起来一个,具体起来哪一个基本上可以认为是随机,受多种因素影响,因为Executor与Task需要启动时都在不断轮询队列是否有空闲资源。队列剩余资源<4G,Executor与Task都无法启动。

情况二 Executor设置3G   Task设置都是4G,队列剩余资源>=4G,Executor与Task都有可能起来;4G>队列剩余资源>=3G,只有Executor可以启动;队列剩余资源<3G,都无法启动。

情况三 Executor设置4G   Task设置都是3G,与情况二相反

综上所述:我们想在资源紧张时,想让自己的任务快速获得资源似乎有办法可行了(生产上是否使用还是评估一下,不要产生恶意竞争,生产还是根据业务合理分配好),

以下提供两种提升任务竞争力的方法

方法一:提高MR竞争力,减少MR的Map个数,也就意味减少申请资源次数(亲测有效,合并小文件也可以使用此参数),执行hive sql前设置以下参数

set mapred.max.split.size=512000000; -- 每个Map最大输入大小,决定合并后的文件数
set mapred.min.split.size.per.node=100000000; -- 一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并
set mapred.min.split.size.per.rack=100000000; -- 一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;-- 执行Map前进行小文件合并

方法二:提高Spark竞争力,设置比Task小的Executor设置内存,不过不要设置太小,设置更多的Executor

"spark.dynamicAllocation.maxExecutors", "300"    --Spark开启动态参数时使用,理论越大越好

"spark.executor.memory", "4g"    --Spark conf时设置

--num-executors 300  --spark-submit 时使用

--executor-memory 4G --spark-submit 时使用,不宜太小

其他

MR内部Task权重排序

RMContainerAllocator 权重(数值越小优先级越高)

    PRIORITY_FAST_FAIL_MAP=5

    PRIORITY_REDUCE=10

    PRIORITY_MAP=20

    PRIORITY_OPPORTUNISTIC_MAP=19

FAST_FAIL_MAP任务、REDUCE任务和MAP任务的Priority分别是5,10,15,数字越小,优先级越高,因此在FSAppAttempt.assignContainer()中这个请求就会优先被考虑,即,如果有失败的Map任务,这个失败的Map任务优先执行,其次是Reduce任务,最后是正常的Map任务。实际运行情况下,MR任务总是先产生map任务,因此先只提交map任务运行,然后如果有reduce任务,reduce任务将优先于现有的map任务运行,而失败的map任务由于需要重试,其它reduce任务可能在等待这个失败的map任务执行完才能进入下一个阶段,因此它的优先级最高。

获得排序后的Priority,就可以按照优先级,遍历这个Priority的list,取出对应Priority的container请求,尝试进行资源分配。