spark job server原理
配置相关
- settings.sh
- 功能:配置环境变量
- APP_USER/APP_GROUP:作业提交用户和组
- JMX_PORT:java jmx端口,通常在aws或者其他容器里打开
- INSTALL_DIR:sjs所做目录
- LOG_DIR:日志路径
- PIDFILE:启动sjs,产生pid存放的文件名
- JOBSERVER_MEMORY:启动spark作业的driverMem
- SPARK_VERSION:指定spark版本
- SCALA_VERSION:scala版本
- SPARK_HOME、SPARK_LOG_DIR、SPARK_CONF_DIR:spark相关配置
- YARN_CONF_DIR、HADOOP_CONF_DIR:yarn配置
- local.conf
- spark.master:指定spark提交的类型,yarn-client、local[4]等
- spark.jobserver
- port:指定jobServer的启动端口,使用此端口进行作业提交和监控等
- context-per-jvm:是否每个context都启动一个独立的进程
- jobdao:指定处理jobs、jars等逻辑的类
- datadao:通过
POST/data
上传到sjs的文件存放路径 - sqldao:当
jobdao
指定为JobSqlDAO
时使用- slick-driver
- jdbc-driver
- rootdir:H2 driver存放数据目录
- jdbc:连接
- dbcp:连接池
- result-chunk-size
- 作业返回值使用分块传输,每块大小
- spark.contexts:启动sjs自动加载的context配置
- 名字
- context-settings:启动context,即app,相关配置
- num-cpu-cores:core个数
- memory-per-node:executor的mem,eg 512m、1G
- dependent-jar-uris:依赖的jar包,list形式,或者字符串,使用逗号隔开
- [“file:///xxx.jar”,”file:///xxx2.jar”],或者”file:///xxx.jar,file:///xxx2.jar”
- 其他的spark配置,去掉前缀spark即可
- 如:spark.speculation可配置为speculation
- server_start.sh
- 启动spark job server
- manager_start.sh
- context-per-jvm设置为true时,才会使用此脚本,用于启动context
使用
- 启动
- 运行脚本server_start.sh即可
- 初始化context
curl -d "" 'ip:port/contexts/roncen_test_context?context-factory=spark.jobserver.context.HiveContextFactory'
- 上传jar包
curl -H "Content-Type: application/java-archive" --data-binary @/home/vipshop/platform/sjs_2.0/jars/job-server-extras_2.11-0.7.0-SNAPSHOT.jar ip:8091/binaries/sql
- 提交作业
curl -d "sql_file=\"hdfs://bipcluster/spark/sql/test_cassandra.sql\"" 'ip:8091/jobs?appName=sql&classPath=spark.jobserver.vip.VipHiveJob&context=roncen_test_context&sync=false'
- 通过jobId获取job运行状态
curl -v 'ip:8091/jobs/xxx
- 删除context
curl -X DELETE "ip:8091/contexts/roncen_test_context"
问题记录
- server返回失败问题
- [delete context时,context上的job并未结束]
- 时不时返回
The server was not able to produce a timely response to your request
问题1:The server was not able to produce a timely response to your request
- 探测方法
curl -v 'ip:port/jobs/b2ee01d2-a495-43a3-a0e5-f2ba82330211'
- 探测对应的jobId状态
- 正常情况下,返回:”RUNNING”|”ERROR”|”FINISHED”
- 获取job状态逻辑
-
spark.jobserver.WebApi
中接收http的GET请求GET /jobs/<jobId>
- 通过akka从
jobInfoActor
中获取job状态GetJobStatus(jobId)
- 从
jobDao
中获取对应jobId的信息-
JobSqlDao.getJobInfo()
中,从数据库中查询对应job的信息,返回
-
- 从
- 返回格式
application/json
给客户端- jobId不存在:返回
No such job ID xxxx
- 存在:
- 构造返回格式:
jobId: , startTime: , classPath: , context: , duration: , status:
- 通过akka从
JobInfoActor
中获取job结果GetJobResult
- 通过
AkkaClusterSupervisorActor
的GetResultActor(context)
得到对应的resultActor
- 通过contextName得到对应的resultActor -
- 通过
resultActor
的GetJobResult(jobId)
得到最后的结果
- 通过
- 返回客户端结果
- 构造返回格式:
- jobId不存在:返回
-
初始化context步骤
- 命令示例
curl -d "" 'ip:port/contexts/sql-context-for-update-on-sale-85?context-factory=spark.jobserver.context.HiveContextFactory'
- 通过http调用
WebApi
中的POST /contexts/<contextName>
-
通过akka调用
AkkaClusterSupervisorActor
的AddContext(cName, config)
- 判断是否存在,如果存在则返回
ContextAlreadyExists
-
调用方法
startContext()
- 生成contextActorName,”jobManager-” + uuid
- 在${LOG_DIR}路径下创建contextDir路径,生成对应文件
context.conf
- 存放
actorname
、context-factory
等基础信息
- 存放
- 生成执行命令:
${deploy.manager-start-cmd} contextDir cluster.selfAddress(akka地址)
,即./manager_start.sh xxx
,此命令是在后台执行的,命令后有&
- 判断返回值,如果失败,返回
ContextInitError
,如果成功,将其放入contextInitInfos
的map中 - 执行上述生成的命令
- 执行主类
spark.jobserver.JobManager
- 获取
context.conf
文件中的配置信息 - 初始化jobDao,
spark.jobserver.jobdao
配置,这里为spark.jobserver.io.JobSqlDAO
- 初始化JobDAOActor,命名为
dao-manager-jobmanager
- 初始化jobManager,命名为
${context.actorname}
- join到cluster中
Cluster(system).join(clusterAddress)
????- 发送
ActorIdentity(memberActors, actorRefOpt)
到AkkaClusterSupervisorActor
- 发送
- 执行主类
-
AkkaClusterSupervisorActor
收到此消息后- 遍历当前cluster所有的actorRef
- 如果返回的actorName以
jobManager
开头则执行以下步骤,否则不处理 - 从
contextInitInfos
中remove当前actorName对应的actor - 执行方法
initContext()
- 初始化
JobResultActor resultActor
- 通过akka将
resultActor
发送给正在处理的actor,即发送消息JobManagerActor.Initialize(Some(resultActor))
到JobManagerActor
-
JobManagerActor
得到消息后,进行如下处理- 初始化
JobStatusActor
- 得到
JobResultActor
,如果resultActor
没有,则初始化一个 - 加载
dependent-jar-uris
指定的jar包 - 生成contextFactory,生成context
- 生成
JobCacheImpl
,用于缓存job信息 - 将
dependent-jar-uris
指定的jar包放入sparkContext.addJar()中 - 返回
Initialized(contextName, resultActor)
,如果失败,则返回InitError(t)
- 初始化
-
- 得到返回值
- 如果成功,则将当前context放到
contexts
中,即contexts(ctxName) = (ref, resActor)
- 如果成功,则将当前context放到
- 如果返回的actorName以
- 遍历当前cluster所有的actorRef
返回成功/失败
- 判断是否存在,如果存在则返回
- 返回json类型结果
提交job到context中
- 命令示例:
curl -d "sql = \"show databases\"" 'ip:port/jobs?appName=sql&classPath=spark.jobserver.HiveTestJob&context=sql-context-for-gs-sku-check-85&sync=true'
- 通过http调用
WebApi
中的POST /jobs
- 通过akka中
AkkaClusterSupervisorActor
的GetContext(name)
,得到对应context的jobManager
- 如果没有得到,则返回
NoSuchContext
或者ContextInitError(err)
- 通过
jobManager
进行与context进程通信,发送JobManagerActor.StartJob
,用于提交作业- 加载未加载的jar包
- 调用
startJobInternal()
- 通过jobSqlDao,获取当前appName上次提交作业的时间和type,如果没有则返回错误
- 随机生成randomUUID,作为jobId
- 通过
sparkContextFactory.loadAndValidateJob()
生成jobContainer- 通过classPath/appname,在
JobCacheImpl
中获取JobJarInfo
,并初始化- 如果cache中没有,会通过akka发送消息
GetBinaryPath()
,从jobSqlDao中获取jar包 - 初始化构造函数,将其放入JobContainer中,返回
- 如果cache中没有,会通过akka发送消息
- 通过classPath/appname,在
- 判断返回值,如果为Good(container),则继续,否则返回错误
- 将结果发送给
JobResultActor
和JobStatusActor
-
JobStatusActor
- 发送消息
SaveJobInfo
到jobSqlDao,将信息存入元数据库
- 发送消息
-
- 调用方法
getJobFuture()
返回结果- 判断当前runningJob是否大于最大运行job,如果是则返回
NoJobSlotsAvailable(maxRunningJobs)
,否则继续 - 使用scala的Future,另起线程执行job
- 设置SparkEnv
- 发送消息
JobInit
到JobStatusActor
- 通过方法
HiveTestJob.validate()
判断当前job是否正常- 如果正常
- 发送消息
JobStarted
到JobStatusActor
- 设置sparkContext的jobGroup为当前jobId,
sc.setJobGroup(jobId, xxx)
- 调用接口,执行job,
HiveTestJob.runJob(jobC, jobEnv, jobData)
- 发送消息
- 否则发送
JobValidationFailed
- 如果正常
- 线程执行结束
- 成功
- 发送
JobFinished
到JobStatusActor
- 发送
JobResult
到JobResultActor
- 发送
- 失败
- 发送
JobErroredOut
到JobStatusActor
- 发送
- 成功
- 判断当前runningJob是否大于最大运行job,如果是则返回
- 判断返回结果,并返回给客户端对应的http reponse
JobResult(jobId, res)
JobErroredOut
-
JobStarted(_, jobInfo)
- 通过akka发送给
JobInfoActor
消息StoreJobConfig(jobInfo.jobId, postedJobConfig)
-
JobInfoActor
得到消息后,通过jobDao.saveJobConfig(jobId, jobConfig)
存储信息,这里为JobSqlDao
-
- 通过akka发送给
JobValidationFailed
NoSuchApplication
NoSuchClass
WrongJobType
WrongJobType
NoJobSlotsAvailable
ContextInitError
- 如果没有得到,则返回