org.apache.spark.SparkException:未能执行用户定义的函数

问题描述:

我新来斯卡拉,我想执行以下代码:org.apache.spark.SparkException:未能执行用户定义的函数

val SetID = udf{(c:String, d: String) => 
    if(c.UpperCase.contains("EXKLUS") == true) 
    {d} 
    else {""} 
} 
val ParquetWithID = STG1 
    .withColumn("ID", SetID(col("line_item"), col("line_item_ID"))) 

两列(line_itemline_item_id)被定义为StringsSTG1架构。

我收到以下错误,当我尝试运行代码:

`org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1$$anonfun$2: (string, string) => string) 
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
at org.apache.spark.scheduler.Task.run(Task.scala:86) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 

Caused by: java.lang.NullPointerException 
    at MyTests$$anonfun$1$$anonfun$2.apply(MyTests.scala:356) 
    at MyTests$$anonfun$1$$anonfun$2.apply(MyTests.scala:355) 
    ... 16 more 

我也试过c.UpperCase().contains("EXKLUS"),但我得到了同样的错误。 但是,如果我只是运行“if equals”声明一切正常。所以我想这个问题是在我的udf中使用UpperCase().contains(" ")函数,但我不明白问题来自哪里。任何帮助将appriciated!

如果schema包含作为

|-- line_item: string (nullable = true) 
|-- line_item_ID: string (nullable = true) 

然后在您的null检查if语句就可以解决这个问题,因为(注意,字符串toUpperCase法)

val SetID = udf{(c:String, d: String) => 
    if(c != null && c.toUpperCase.contains("EXKLUS") == true) 
    {d} 
    else {""} 
} 
val ParquetWithID = STG1 
    .withColumn("ID", SetID(col("line_item"), col("line_item_ID"))) 

我希望答案有帮助

+0

这工作!谢谢! – Inna

+0

很高兴听到@Inna和感谢接受:) –