pyspark、hive和dateframe临时udf使用案例
背景
在我们数据开发过程中业务中有很多计算时间差值的场景,公司业务数据时间格式基本是:yyyyMMdd,,而spark或者hive提供内置函数datediff(),需要传入的时间参数格式:yyyy-MM-dd,为开发时简化代码量与代码可读性,我们通过创建临时udf可以灵活的解决该问题。
pyspark
import time def dd_datediff(date_now, date_ago, date_type="day"): date_now = time.strptime(str(date_now), '%Y%m%d') date_ago = time.strptime(str(date_ago), '%Y%m%d') # 暂时返回天数 if date_type == 'day': day_num = int((time.mktime(date_now) - time.mktime(date_ago)) / (24 * 60 * 60)) return day_num
spark.udf.register("dd_datediff", lambda x, y: dd_datediff(x, y)) spark.sql("select dd_datediff(20200702, 20200629)") |
效果演示:
hive
CREATE TEMPORARY MACRO dd_datediff(data_date_now string, data_date_ago string) int(datediff(from_unixtime(unix_timestamp(data_date_now, 'yyyyMMdd'), 'yyyy-MM-dd'), from_unixtime(unix_timestamp(data_date_ago, 'yyyyMMdd'), 'yyyy-MM-dd'))); select dd_datediff(20200702, 20200629); |
效果演示:
dataframe
df = spark.sql("select 20200702 as start,20200629 as end") df.show() import time def my_datediff(date_now, date_ago, date_type="day"): date_now = time.strptime(str(date_now), '%Y%m%d') date_ago = time.strptime(str(date_ago), '%Y%m%d') # 暂时返回天数 if date_type == 'day': day_num = int((time.mktime(date_now) - time.mktime(date_ago)) / (24 * 60 * 60)) return day_num
from pyspark.sql import functions,types dd_datediff = functions.udf(my_datediff, types.StringType()) df = df.withColumn("diff", dd_datediff(df["start"], df["end"])).show() |
效果演示: