Scala和星火UDF功能
问题描述:
我做了一个简单的UDF转换或在火花temptabl提取时间字段中的值。我注册函数,但是当我使用sql调用函数时,它会抛出一个NullPointerException异常。以下是我的功能和执行它的过程。我正在使用齐柏林飞艇。这件事昨天正在起作用,但今天早上停止了工作。Scala和星火UDF功能
功能
def convert(time:String) : String = {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
return sdf.format(time1)
}
注册功能
sqlContext.udf.register("convert",convert _)
测试功能,无需SQL - 这工作
convert(12:12:12) -> returns 12:12
试验飞艇与SQL函数失败。的不是Temptable
root
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- serverip: string (nullable = true)
|-- request: string (nullable = true)
|-- resource: string (nullable = true)
|-- protocol: integer (nullable = true)
|-- sourceip: string (nullable = true)
,我得到堆栈跟踪的部分
%sql
select convert(time) from temptable limit 10
结构。
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)
答
使用UDF代替定义一个函数直接
import org.apache.spark.sql.functions._
val convert = udf[String, String](time => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
}
)
一个UDF的输入参数是列(或列)。而返回类型是Column。
case class UserDefinedFunction protected[sql] (
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]]) {
def apply(exprs: Column*): Column = {
Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
}
}