spark自定义监听器

Spark application 支持自定义listener,用户可以实时获取任务状态给自己的监控系统,可以获取以下几个状态:

trait SparkListener {
  /**
    * 当一个state执行成功或者失败的时候调用,包含了已完成stage的信息
   * Called when a stage completes successfully or fails, with information on the completed stage.
   */
  def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
 
  /**
    * 当一个state提交的时候的时候调用
   * Called when a stage is submitted
   */
  def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
 
  /**
    * 当一个task任务开始时候调用
   * Called when a task starts
   */
  def onTaskStart(taskStart: SparkListenerTaskStart) { }
 
  /**
    * 当一个task执行成功或者失败的时候调用,包含了已完成task的信息
   * Called when a task begins remotely fetching its result (will not be called for tasks that do
   * not need to fetch the result remotely).
   */
  def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
 
  /**
    * 当一个task结束开始时候调用
   * Called when a task ends
   */
  def onTaskEnd(taskEnd: SparkListenerTaskEnd) { }
 
  /**
    * 当一个job启动开始调用
   * Called when a job starts
   */
  def onJobStart(jobStart: SparkListenerJobStart) { }
 
  /**
    *  当一个job执行成功或者失败的时候调用,包含了已完成job的信息
   * Called when a job ends
   */
  def onJobEnd(jobEnd: SparkListenerJobEnd) { }
 
  /**
    * 当一个环境变量改变的时候开始调用
   * Called when environment properties have been updated
   */
  def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { }
 
  /**
   * Called when a new block manager has joined
   */
  def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) { }
 
  /**
   * Called when an existing block manager has been removed
   */
  def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { }
 
  /**
   * Called when an RDD is manually unpersisted by the application
   */
  def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { }
 
  /**
   * Called when the application starts
   */
  def onApplicationStart(applicationStart: SparkListenerApplicationStart) { }
 
  /**
   * Called when the application ends
   */
  def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }
 
  /**
   * Called when the driver receives task metrics from an executor in a heartbeat.
   */
  def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { }
 
  /**
   * Called when the driver registers a new executor.
   */
  def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) { }
 
  /**
   * Called when the driver removes an executor.
   */
  def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }
 
  /**
    * 
   * Called when the driver receives a block update info.
   */
  
  def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
}

只需要extends SparkListener,然后注册到sparkContext 既可以实现自定义listener,代码逻辑如下:

package com.suning.spark
 
import org.apache.spark.scheduler.MySparkListener
import org.apache.spark.{SparkConf, SparkContext}
 
/**
  * Created by Ricky on 2016/4/14 0014.
  */
object JobProcesser {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("KafkaWordCountProducer").setMaster("local")
    val sc = new SparkContext(sparkConf)
    /*  sc.setJobGroup("test1","testdesc")
      val completedJobs= sc.jobProgressListener*/
    sc.addSparkListener(new MySparkListener)
    val rdd1 = sc.parallelize(List(('a', 'c', 1), ('b', 'a', 1), ('b', 'd', 8)))
    val rdd2 = sc.parallelize(List(('a', 'c', 2), ('b', 'c', 5), ('b', 'd', 6)))
    val rdd3 = rdd1.union(rdd2).map {
      x => {
        Thread.sleep(500)
        x
      }
    }.count()
    rdd1.map(x => 0.2).map(x => 0).map {
      x => {
        if (x == 0) {
          throw new Exception("my exeception")
        }
      }
        x
    }.reduce(_ + _)
    println(rdd3)
    sc.stop()
  }
}


package org.apache.spark.scheduler
 
 
/**
  * Created by Ricky on 2016/4/14 0014.
  */
class MySparkListener extends SparkListener {
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
    println("*************************************************")
    println("app:end")
    println("*************************************************")
  }
 
  override def onJobEnd(jobEnd: SparkListenerJobEnd) {
    println("*************************************************")
    println("job:end")
    jobEnd.jobResult match {
      case JobSucceeded =>
        println("job:end:JobSucceeded")
      case JobFailed(exception) =>
        println("job:end:file")
        exception.printStackTrace()
    }
    println("*************************************************")
  }
}
spark自定义监听器

 

事件总线运行于driver端,事件来源如上图。