火花UDF类型不匹配错误
问题描述:
我想写一个UDF将时间戳转换为表示一周小时的整数。我很容易用SparkSql来完成这件事。火花UDF类型不匹配错误
我有很多的UDF在我们这个确切的语法的代码,但此人试图类型不匹配错误。我也试着用col("session_ts_start")
来调用我的UDF,但那也失败了。
import spark.implicits._
import java.sql.Timestamp
import org.apache.spark.sql.functions._
def getHourOfWeek() = udf(
(ts: Timestamp) => unix_timestamp(ts)
)
val dDF = df.withColumn("hour", getHourOfWeek()(df("session_ts_start")))
dDF.show()
<console>:154: error: type mismatch;
found : java.sql.Timestamp
required: org.apache.spark.sql.Column
(ts: Timestamp) => unix_timestamp(ts)
答
unix_timestamp
是一个SQL函数。它operates on Columns
不是外在价值:
def unix_timestamp(s: Column): Column
,它不能在UDF中使用。
我想(...)的时间戳转换成表示一周的时间整数
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{date_format, hour}
def getHourOfWeek(c: Column) =
// https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html
(date_format(c, "u").cast("integer") - 1) * 24 + hour(c)
val df = Seq("2017-03-07 01:00:00").toDF("ts").select($"ts".cast("timestamp"))
df.select(getHourOfWeek($"ts").alias("hour")).show
+----+
|hour|
+----+
| 25|
+----+
另一种可能的解决方案:
import org.apache.spark.sql.functions.{next_day, date_sub}
def getHourOfWeek2(c: Column) = ((
c.cast("bigint") -
date_sub(next_day(c, "Mon"), 7).cast("timestamp").cast("bigint")
)/3600).cast("int")
df.select(getHourOfWeek2($"ts").alias("hour"))
+----+
|hour|
+----+
| 25|
+----+
注意:这两个解决方案都不处理夏令时或其他日期/时间微妙之处。