Spark,在EMR中抛出SparkException时出现错误行为

问题描述:

我在使用YARN作为资源管理器和2个节点的EMR中运行Spark任务。如果我的条件不符合,我需要有目的地失败该步骤,因此下一步不会按照配置执行。 为了实现这一点,我在dynamoDB中插入日志消息后抛出了一个自定义异常。Spark,在EMR中抛出SparkException时出现错误行为

它运行良好,但Dynamo中的记录插入了两次。

以下是我的代码。

if(<condition>) { 
    <method call to insert in dynamo> 
    throw new SparkException(<msg>); 
    return; 
} 

如果我删除行以抛出异常,它工作正常,但步骤完成。

如何在不获取日志消息两次的情况下使步骤失败。

感谢您的帮助。

问候, Sorabh

可能插入您的发电机消息的原因两次是因为您的错误条件被击中,由两个不同的遗嘱执行人处理。斯帕克将工作中要完成的工作分开,这些工人不会分享任何知识。

我不确定驱动你的需求是否有Spark步骤失败,但我会建议在你的应用程序代码中跟踪那个失败案例,而不是试图直接触发死亡。换句话说,编写检测错误的代码并将其传递给您的火花驱动程序,然后根据需要对其执行操作。

执行此操作的一种方法是使用累加器来计算处理数据时发生的任何错误。它会看起来大致是这样的(我假设Scala和DataFrames,但您可以根据需要适应RDD的和/或Python):

val accum = sc.longAccumulator("Error Counter") 
def doProcessing(a: String, b: String): String = { 
    if(condition) { 
    accum.add(1) 
    null 
    } 
    else { 
    doComputation(a, b) 
    } 
} 
val doProcessingUdf = udf(doProcessing _) 

df = df.withColumn("result", doProcessing($"a", $"b")) 

df.write.format(..).save(..) // Accumulator value not computed until an action occurs! 

if(accum.value > 0) { 
    // An error detected during computation! Do whatever needs to be done. 
    <insert dynamo message here> 
} 

关于这种方法的一个好处是,如果你正在寻找反馈在Spark UI中,您可以在运行时看到累加器值。作为参考,这里是关于蓄电池的文件: http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators