PySpark平面图应该返回带有类型值的元组
问题描述:
我使用带PySpark的Jupyter Notebook。在那里我有一个数据框架,这些数据架构有一个列名和类型(整数,...)的列。现在我使用flatMap这样的方法,但是这会返回一个没有固定类型的元组列表。有没有办法实现这一点?PySpark平面图应该返回带有类型值的元组
df.printSchema()
root
|-- name: string (nullable = true)
|-- ...
|-- ...
|-- ratings: integer (nullable = true)
然后我用flatMap做的额定值一些计算(这里混淆):
df.flatMap(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings))
y_rate.toDF().printSchema()
现在我得到一个错误:
TypeError: Can not infer schema for type:
有什么办法通过保持模式使用map/flatMap/reduce?或者至少返回具有特定类型值的元组?
答
首先,您使用的是错误的功能。 flatMap
将map
和flatten
所以假设你的数据是这样的:
df = sc.parallelize([("foo", 0), ("bar", 10)]).toDF(["id", "ratings"])
的flatMap
的输出将等同于:
sc.parallelize(['foo', 0, 'bar', 5])
因此,你看到的错误。如果你真的想使它工作,你应该使用map
:
df.rdd.map(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)).toDF()
## DataFrame[_1: string, _2: bigint]
接着,在DataFrame
映射不再2.0支持。您应首先提取rdd
(请参阅上面的df.rdd.map
)。
最后在Python和JVM之间传递数据效率极低。它不仅需要在Python和JVM之间传递数据以及相应的序列化/反序列化和模式推理(如果没有明确提供模式),这也会打破懒惰。这是更好地使用SQL表达式这样的事情:
from pyspark.sql.functions import when
df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings))
如果由于某种原因,你需要普通的Python代码的UDF可能是一个更好的选择。
非常有帮助。感谢您的示例代码。我只是没有得到flatMap vs Map的部分。 – Matthias
'flatMap'是一个函数'RDD [T] =>(T => Iterable [U])=> RDD [U]'。换句话说,它期望函数返回'Itereble'(Python元组),并连接这些(变平)结果。 – zero323
有没有办法在该声明中给出when/otherwise列的名称?请参阅'df.select(df.id,when(df.ratings> 5,5).otherwise(df.ratings))'@ zero323 – Matthias