spark内核揭秘-spark资源调度系统个人理解(待续)

开发Spark应用程序的大致流程

基于Spark写Application->资源调度器->任务调度器->分布式并行计算

此次我们只讨论资源调度:

资源调度的原理

以下是Standalone集群的一个申请资源的过程
spark内核揭秘-spark资源调度系统个人理解(待续)

  1. Mater节点运行start-all.sh命令后开启集群,它会通过扫描slaves配置文件中的每一个IP,通过ssh协议连接他们开启每一个Worker,当Worker开启成功后,他们会通过spark-env.sh中Master的IP和端口发送成功启动的信息.
  2. 当开启集群的时候,Worker会向Master发送他们各自的资源信息,Master把他们的信息全部封装到一个workers = new HashSetWorkerInfo,还会存放他们的IP和端口。
  3. 客户端client以cluster方式提交一个Application的话,spark-submit --master spark://node01:7077 --deploy-mode cluster …,首先client节点会启动一个spark-submit进程,它为Driver向Master申请资源,Master中的waitingDrivers = new ArrayBuffer[DriverInfo]对象会存放为Driver申请资源的信息。
  4. Driver正常启动后,spark-submit进程被杀掉,Driver开始向Master申请Application的资源,Master中的waitingApps = new ArrayBuffer[ApplicationInfo]来存放为Application申请资源的信息
  5. Executor进程开启

waitingDrivers集合

当waitingDrivers集合中元素不为空,说明有用户单向Master申请资源了,此使用该查看当前集群的资源情况(产看一下works集合),找到符合要求的节点,启动Driver,当Driver正常启动,这个申请资源的信息从waitingDrivers中删除掉。

waitingApps集合

当waitingApps集合不为空,说明有Driver向Master为当前的Application申请资源,查看集群的资源情况(Workers集合)找到合适的Worker节点,开启Executor进程;默认情况下,每一个Worker为当前的Application只是启动一个Executor进程。这个Executor会使用1G内存和这个Worker所管理的所有的core。

对waitingApps、waitingDrivers集合的监控

因为他们这两个集合一旦发生变化,就证明有人向Master申请了资源,Master就必须得知道具体情况,所以需要时时监控他们。

所以Master里有一个schedule()方法,每当这两个集合中添加元素的时候,就会反调这个方法,这个方法里有2套逻辑,分别对应这两个集合,当某个集合反调这个函数时,它会按照上述处理过程来处理。

资源调度的结论

  1. 默认情况下,每一个Worker为当前的Application只会启动一个Executor(他默认使用当前Worker所能管理的所有的核,1G内存)
  2. 如果想要在一个Worker上启动多个Executor进程,再提交Application的时候,要指定executor使用的核数;spark-submit --executor-cores
  3. 默认情况下,Executor的启动,时轮训方式启动的,轮训的启动方式在一定的程度上有利于数据的本地化

为什么轮训的方式比阻塞的方式好?

轮训方式在一定的程度上有利于数据的本地化,若全开在一台节点上,此时计算的数据有可能不在此节点上,此时数据要走网络传输,这就导致数据找计算。

轮训方式启动Executor的公式

–executor-cores:每个executor需要的核;ec
–executor-memory:每个executor需要内存;em
–total-executor-cores:这个Application一共需要多少个核;tec
worker num:Worker节点个数;wn
worker memory:Worker节点的内存;wm
worker core:Worker节点的核;wc

min(min(wm/em,wc/ec)*wn,tec/ec)

Spark运行在yarn集群上的2种提交方式

client

cluster