理解Spark运行模式(二)
[url=][/url]
1 from __future__ import print_function 2 3 import sys 4 from random import random 5 from operator import add 6 7 frompyspark.sql import SparkSession 8 9 10 if __name__ == "__main__":11 """12 Usage: pi [partitions]13 """14 spark = SparkSession\15 .builder\16 .appName("PythonPi")\17 .getOrCreate()18 19 partitions = int(sys.argv[1]) iflen(sys.argv) > 1 else 220 n = 100000 * partitions21 22 def f(_):23 x = random() * 2 - 124 y = random() * 2 - 125 return 1 if x ** 2 + y ** 2 <= 1 else 026 27 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)28 print("Pi is roughly %f" % (4.0 * count / n))29 30 spark.stop()[url=][/url]
- Spark Yarn Client向YARN提交应用程序,类似于MapReduce向Yarn提交程序,会将程序文件、库文件和配置文件等上传到HDFS。
- ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster中会运行Spark Driver,并进行SparkContext的初始化。
- ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束。
- 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度。
- ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。
- 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。