PySpark中的UDFs函数

我们在用python原生的函数来处理迭代我们的数据,但是我们发现在处理一个比较大的dataframe可能会花费我们很久的时间。所以如果我们拥有一个集群,那么如何在集群上通过Pyspark来加速我们的处理速度呢?换句话说我们如何将python函数转化成Spark的用户自定义函数(UDF)呢?

PySpark中的UDFs函数

注册一个UDF

Pyspark UDFs跟pandas的series和dataframes的.map()和.apply()方法类似。我可以将dataframe中的行作为函数的输入值,然后我们可以遍历整个dataframe。那唯一的区别是PySpark UDFs必须定义输出数据的类型。

举个例子,我从pandas的dataframe中创建一个PySpark的dataframe。

import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("basic-op-price-fluctuate") \
    .master("local") \
    .config("fs.defaultFS","hdfs://host1:8020") \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df_pd = pd.DataFrame(
    data={'integers':[1,2,3],
          'floats':[-1.0,0.6,2.6],
          'integer_arrays':[[1,2],[3,4.6],[5,6,8,9]]}
)
df = spark.createDataFrame(df_pd)
df.printSchema()

PySpark中的UDFs函数

df.show()

PySpark中的UDFs函数

原始类型输出:

我们想将一个求平方和的python函数注册成一个Spark UDF函数

def square(x):

    return x**2

在Spark中python函数的输出是类型相关的,当我们在注册UDFs时,我们必须通过pyspark.sql.types来申明数据类型。

在Spark UDF中它不会把Integer转化成float或者double类型,如果在UDF中指定的类型与输出的类型不一致的话,结果会返回null。

# Integer type output
from pyspark.sql.types import IntegerType
square_udf_int = udf(lambda z:square(z),IntegerType())
df.select('integers',
           'floats',
           square_udf_int('integers').alias('int_squared'),
           square_udf_int('floats').alias('float_squared')).show()

PySpark中的UDFs函数

将结果注册成float类型

# float type output
from pyspark.sql.types import FloatType
square_udf_float = udf(lambda z: square(z), FloatType())
df.select('integers',
           'floats',
          square_udf_float('integers').alias('int_squared'),
          square_udf_float('floats').alias('float_squared')).show()

PySpark中的UDFs函数

列表输出类型

如果我们的输出类型是一个列表,那么我们在注册UDF时候将类型注册成ArrayType()。

from pyspark.sql.types import ArrayType

def square_list(x):
    return [float(val)**2 for val in x]
square_list_udf = udf(lambda y: square_list(y), ArrayType(FloatType()))
df.select('integer_arrays', square_list_udf('integer_arrays')).show()

PySpark中的UDFs函数

对于返回类型是tuple或者是混合类型的函数,我们可以定义一个相关的StructType(),里面的字段可以定义为StructField()。举个例子,有一个函数,输入一个数字,返回数字以及该数字对应字母表中的字母。

import string
from pyspark.sql.types import StructField,StructType,IntegerType,StringType
def convert_ascii(number):
    return [number, string.ascii_letters[number]]
array_schema = StructType([
    StructField('number', IntegerType(), nullable=False),
    StructField('letters', StringType(), nullable=False)
])

spark_convert_ascii = udf(lambda z: convert_ascii(z), array_schema)

df_ascii = df.select('integers', spark_convert_ascii('integers').alias('ascii_map'))
df_ascii.show()

PySpark中的UDFs函数

解决dataframe数据量太少而导致集训运行时间过慢的问题,由于dataframe太小,spark会将整个dataframe分配到同一个executor,而其他的executor会闲置等待,也就是说spark并没有将python function并行化运算。

为了解决这个问题,我们可以在调用UDF函数之前将dataframe重分片。

df_repartitioned = df.repartition(100)

当我们提交一个作业时,要确保分片的数量要要多余executor的个数。

 

基于注解方式Pandas UDF

Spark 在 0.7 版中添加了 Python API,并支持user-defined functions。这些用户定义的函数一次只能操作一行,因此会遭遇高序列化和调用开销。因此,许多数据管道在 Java 和 Scala 中定义 UDF,然后从 Python 中调用它们。

基于 Apache Arrow 构建的 Pandas UDF 为您提供了两全其美的功能 - 完全用 Python 定义低开销,高性能 UDF的能力。

Spark 2.3 中,将会有两种类型的 Pandas UDF: 标量(scalar)和分组映射(grouped map)Spark2.4新支持Grouped Aggregate

Scalar Pandas UDFs

Scalar Pandas UDFs 用于向量化标量运算。要定义一个标量 Pandas UDF,只需使用 @pandas_udf 来注释一个 Python 函数,该函数接受 pandas.Series 作为参数并返回另一个相同大小的 pandas.Series。

Grouped Map Pandas UDFs

Python 用户对数据分析中的 split-apply-combine 模式非常熟悉。Grouped Map Pandas UDF 是针对这种情况设计的,它们针对某些组的所有数据进行操作,例如“针对每个日期应用此操作”。

Grouped Map Pandas UDF 首先根据 groupby 运算符中指定的条件将 Spark DataFrame 分组,然后将用户定义的函数(pandas.DataFrame -> pandas.DataFrame)应用于每个组,并将结果组合并作为新的 Spark DataFrame 返回。

安装

安装PySpark with PyArrow

pip install pyspark[sql]=2.3.2

在代码中添加启动arrow

# Enable Arrow-based columnar data transfers

spark.conf.set("spark.sql.execution.arrow.enabled", "true")

 

求平均数:

from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType

spark = SparkSession \
    .builder \
    .appName("basic-op-price-fluctuate") \
    .getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def substract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(substract_mean).show()