pyspark、hive和dateframe临时udf使用案例

背景

        在我们数据开发过程中业务中有很多计算时间差值的场景,公司业务数据时间格式基本是:yyyyMMdd,,而spark或者hive提供内置函数datediff(),需要传入的时间参数格式:yyyy-MM-dd,为开发时简化代码量与代码可读性,我们通过创建临时udf可以灵活的解决该问题。

pyspark

import time

def dd_datediff(date_now, date_ago, date_type="day"):

date_now = time.strptime(str(date_now), '%Y%m%d')

date_ago = time.strptime(str(date_ago), '%Y%m%d')

# 暂时返回天数

if date_type == 'day':

day_num = int((time.mktime(date_now) - time.mktime(date_ago)) / (24 * 60 * 60))

return day_num

 

spark.udf.register("dd_datediff", lambda x, y: dd_datediff(x, y))

spark.sql("select dd_datediff(20200702, 20200629)")

效果演示:

pyspark、hive和dateframe临时udf使用案例

 

hive

CREATE TEMPORARY MACRO dd_datediff(data_date_now string, data_date_ago string) int(datediff(from_unixtime(unix_timestamp(data_date_now, 'yyyyMMdd'), 'yyyy-MM-dd'), from_unixtime(unix_timestamp(data_date_ago, 'yyyyMMdd'), 'yyyy-MM-dd')));

select dd_datediff(20200702, 20200629);

效果演示:

pyspark、hive和dateframe临时udf使用案例

dataframe

df = spark.sql("select 20200702 as start,20200629 as end")

df.show()

import time

def my_datediff(date_now, date_ago, date_type="day"):

date_now = time.strptime(str(date_now), '%Y%m%d')

date_ago = time.strptime(str(date_ago), '%Y%m%d')

# 暂时返回天数

if date_type == 'day':

day_num = int((time.mktime(date_now) - time.mktime(date_ago)) / (24 * 60 * 60))

return day_num

 

from pyspark.sql import functions,types

dd_datediff = functions.udf(my_datediff, types.StringType())

df = df.withColumn("diff", dd_datediff(df["start"], df["end"])).show()

效果演示:

 

pyspark、hive和dateframe临时udf使用案例