Spark in action on Kubernetes - Spark Operator 的原理解析


作者 | 阿里云智能事业群技术专家 莫源

Spark Operator 的内部实现

在深入解析 Spark Operator 之前,我们先补充一些关于 Kubernetes operator 的知识。2018 年可以说是 Kubernetes operator 泛滥的一年,各种 operator 如雨后春笋般出现。operator 是扩展 Kubernetes 以及与 kubernetes 集成的最佳方式之一。

在 Kubernetes 的设计理念中,有很重要的一条就是进行了抽象,比如对存储进行抽象、对应用负载进行抽象、对接入层进行抽象等等。每个抽象又对应了各自生命周期管理的 controller,开发者提交的 Yaml 实际上是对抽象终态的描述,而 controller 会监听抽象的变化、解析并进行处理,最终尝试将状态修正到终态。

Spark in action on Kubernetes - Spark Operator 的原理解析

那么对于在 Kubernetes 中未定义的抽象该如何处理呢,答案就是 operator。一个标准 operator 通常包含如下几个部分:


  • CRD 抽象的定义,负责描述抽象所能包含的功能;
  • CRD Controller ,负责解析 CRD 定义的内容以及生命周期的管理;
  • clent-go 的 SDK,负责提供代码集成时使用的 SDK。
Spark in action on Kubernetes - Spark Operator 的原理解析

有了这个知识储备,那么我们回过头来看 Spark Operator 的代码,结构基本就比较明晰了。

  • 核心的代码逻辑都在 pkg 下,其中 apis 下面主要是定义了不同版本的 API;
  • client 目录下主要是自动生成的 client-go 的 SDK;
  • crd 目录下主要是定义的两个自定义资源 sparkapplication 和 scheduledsparkapplication 的结构。
  • controller 目录下主要定义的就是这个 operator 的生命周期管理的逻辑;
  • config 目录下主要处理 spark config 的转换。

了解一个 Operator 能力最快捷的方式,就是查看 CRD 的定义。在 Spark Operator 中定义了 sparkapplication 和 scheduledsparkapplication 两个 CRD,他们之间有什么区别呢?

sparkapplication 是对常规 spark 任务的抽象,作业是单次运行的,作业运行完毕后,所有的 Pod 会进入 Succeed 或者 Failed 的状态。

而 scheduledsparkapplication 是对离线定时任务的一种抽象,开发者可以在 scheduledsparkapplication 中定义类似 crontab 的任务,实现 spark 离线任务的周期性定时调度。

Spark in action on Kubernetes - Spark Operator 的原理解析

上面这张图是 Spark 中 Kubernetes 的集成图,也就是说当我们通过 spark-submit 提交作业的时候,会自动生成 driver pod 与 exector pods。那么引入了 Spark Operator 后,这个流程变成了什么呢?

func (c *Controller) submitSparkApplication(app *v1beta1.SparkApplication) *v1beta1.SparkApplication {
// prometheus的监控指标的暴露
appToSubmit := app.DeepCopy()
if appToSubmit.Spec.Monitoring != nil && appToSubmit.Spec.Monitoring.Prometheus != nil {
if err := configPrometheusMonitoring(appToSubmit, c.kubeClient); err != nil {
glog.Error(err)
}
}
// 将CRD中的定义转变为spark-submit的命令
submissionCmdArgs, err := buildSubmissionCommandArgs(appToSubmit)
if err != nil {
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
return app
}
// 在operator容器内通过spark-submit提交作业
submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, appToSubmit))
if err != nil {
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
c.recordSparkApplicationEvent(app)
glog.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
return app
}
// 因为Pod的状态也会被Spark Operator进行观测,因此driver pod宕掉会被重新拉起
// 这是和直接跑spark-submit的一大区别,提供了故障恢复的能力。
if !submitted {
// The application may not have been submitted even if err == nil, e.g., when some
// state update caused an attempt to re-submit the application, in which case no
// error gets returned from runSparkSubmit. If this is the case, we simply return.
return app
}
glog.Infof("SparkApplication %s/%s has been submitted", app.Namespace, app.Name)
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.SubmittedState,
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
ExecutionAttempts: app.Status.ExecutionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
c.recordSparkApplicationEvent(app)
// 通过service暴露spark-ui
service, err := createSparkUIService(app, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI service for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIServiceName = service.serviceName
app.Status.DriverInfo.WebUIPort = service.nodePort
// Create UI Ingress if ingress-format is set.
if c.ingressURLFormat != "" {
ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL
app.Status.DriverInfo.WebUIIngressName = ingress.ingressName
}
}
}
return app
}

其实到此,我们就已经基本了解 Spark Operator 做的事情了,首先定义了两种不同的 CRD 对象,分别对应普通的计算任务与定时周期性的计算任务,然后解析 CRD 的配置文件,拼装成为 spark-submit 的命令,通过 prometheus 暴露监控数据采集接口,创建 Service 提供 spark-ui 的访问。然后通过监听 Pod 的状态,不断回写更新 CRD 对象,实现了 spark 作业任务的生命周期管理。

Spark Operator 的任务状态机

当我们了解了 Spark Operator 的设计思路和基本流程后,还需要深入了解的就是 sparkapplication 的状态都包含哪些,他们之间是如何进行转换的,因为这是 Spark Operator 对于生命周期管理增强最重要的部分。

Spark in action on Kubernetes - Spark Operator 的原理解析

一个 Spark 的作业任务可以通过上述的状态机转换图进行表示,一个正常的作业任务经历如下几个状态:

New -> Submitted -> Running -> Succeeding -> Completed

而当任务失败的时候会进行重试,若重试超过最大重试次数则会失败。也就是说如果在任务的执行过程中,由于资源、调度等因素造成 Pod 被驱逐或者移除,Spark Operator 都会通过自身的状态机状态转换进行重试。

Spark Operator 的状态排查

我们已经知道了 Spark Operator 最核心的功能就是将 CRD 的配置转换为 spark-submit 的命令,那么当一个作业运行不预期的时候,我们该如何判断是哪一层出现的问题呢?

首先我们要判断的就是 spark-submit 时所生成的参数是否是预期的,因为 CRD 的 Yaml 配置虽然可以增强表达能力,但是提高了配置的难度与出错的可能性。

func runSparkSubmit(submission *submission) (bool, error) {
sparkHome, present := os.LookupEnv(sparkHomeEnvVar)
if !present {
glog.Error("SPARK_HOME is not specified")
}
var command = filepath.Join(sparkHome, "/bin/spark-submit")
cmd := execCommand(command, submission.args...)
glog.V(2).Infof("spark-submit arguments: %v", cmd.Args)
if _, err := cmd.Output(); err != nil {
var errorMsg string
if exitErr, ok := err.(*exec.ExitError); ok {
errorMsg = string(exitErr.Stderr)
}
// The driver pod of the application already exists.
if strings.Contains(errorMsg, podAlreadyExistsErrorCode) {
glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name)
return false, nil
}
if errorMsg != "" {
return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg)
}
return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err)
}
return true, nil
}

默认情况下 Spark Operator 会通过 glog level=2 等级对外输出每次作业提交后转换的提交命令。而默认情况下,glog 的 level 即为 2,因此通过检查 Spark Operator 的 Pod 日志可以协助开发者快速排查问题。

此外在 sparkapplication 上面也会通过 event 的方式进行状态的记录,上述状态机之间的转换都会通过 event 的方式体现在 sparkapplication 的对象上。掌握这两种方式进行问题排查,可以节省大量排错时间。

总结

使用 Spark Operator 是在 Kubernetes 上实践 spark 的最佳方式,和传统的 spark-submit 相比提供了更多的故障恢复与可靠性保障,并且提供了监控、日志、UI 等能力的集成与支持。

本文转自公众号:CNCF