更新基于另一列
问题描述:
一个数组Pyspark DF列这是我的pyspark数据框模式:更新基于另一列
root
|-- user: string (nullable = true)
|-- table: string (nullable = true)
|-- changeDate: string (nullable = true)
|-- fieldList: string (nullable = true)
|-- id: string (nullable = true)
|-- value2: integer (nullable = false)
|-- value: double (nullable = false)
|-- name: string (nullable = false)
|-- temp: array (nullable = true)
| |-- element: string (containsNull = true)
|-- num_cols_changed: integer (nullable = true)
在数据帧中的数据:
+--------+-----+--------------------+--------------------+------+------+-----+----+--------------------+----------------+
| user|table| changeDate| fieldList| id|value2|value|name| temp|num_cols_changed|
+--------+-----+--------------------+--------------------+------+------+-----+----+--------------------+----------------+
| user11 | TAB1| 2016-01-24 19:10...| value2 = 100|555555| 200| 0.5| old| [value2 = 100]| 1|
| user01 | TAB1| 2015-12-31 13:12...|value = 0.34,name=new| 1111| 200| 0.5| old|[value = 0.34, n...| 2|
+--------+-----+--------------------+--------------------+------+------+-----+----+--------------------+----------------+
我想读的临时数组列,并基于这些值,我想更改数据框中的列。例如,第一行只有一列被更改,即value 2
,所以我想用新的值100更新列df.value2
。同样,在下一行中,更改了2列,所以我需要提取值和名称与他们的值并更新数据框中的适当列。所以输出应该是这样的:
+--------+-----+--------------------+------+------+-----+----+
| user|table| changeDate| id|value2|value|name|
+--------+-----+--------------------+------+------+-----+----+
| user11 | TAB1| 2016-01-24 19:10...|555555| 100| 0.5| old|
| user01 | TAB1| 2015-12-31 13:12...| 1111| 200| 0.34| new|
+--------+-----+--------------------+------+------+-----+----+
我想记住程序的性能,因此在仅仅使用dataframes方式聚焦,但如果没有选择我可以去RDD路线了。 基本上,我不知道如何在一行中处理多个值然后进行比较。我知道我可以使用column in df.columns
来比较列名,但是对于使用数组的每一行这样做会让我感到困惑。任何帮助或新想法表示赞赏。
答
这是我如何解决这个使用explode
:
df = df.withColumn('temp', split(df.fieldList, ','))
df = df.withColumn('cols', explode(df.temp))
df = df.withColumn('col_value', split(df.cols, '='))
df = df.withColumn('deltaCol', df.col_value[0])
.withColumn('deltaValue',df.col_value[1])
上述的最终输出(下降无关列后),因此本:
+------+-----+--------+--------------------+--------+----------+
| id|table| user| changeDate|deltaCol|deltaValue|
+------+-----+--------+--------------------+--------+----------+
|555555| TAB2| user11 | 2016-01-24 19:10...| value2 | 100|
| 1111| TAB1| user01 | 2015-12-31 13:12...| value | 0.34|
| 1111| TAB1| user01 | 2015-12-31 13:12...| name | 'newName'|
+------+-----+--------+--------------------+--------+----------+
这个我把它注册为一个表格之后并执行SQL操作以转发数据:
>>> res = sqlContext.sql("select id, table, user, changeDate, max(value2) as value2, max(value) as value, max(name) as name \
... from (select id, table, user, changeDate, case when trim(deltaCol) == 'value2' then deltaValue else Null end value2,\
... case when trim(deltaCol) == 'value' then deltaValue else Null end value,\
... case when trim(deltaCol) == 'name' then deltaValue else Null end name from delta) t group by id, table, user, changeDate")
这样的结果是:
+------+-----+--------+--------------------+------+-----+----------+
| id|table| user| changeDate|value2|value| name|
+------+-----+--------+--------------------+------+-----+----------+
|555555| TAB2| user11 | 2016-01-24 19:10...| 100| null| null|
| 1111| TAB1| user01 | 2015-12-31 13:12...| null| 0.34| 'newName'|
+------+-----+--------+--------------------+------+-----+----------+
对于这个码与不同的表的使用,I使用的主DF(我的最终目标表)的列,以制备列的字符串:
>>> string = [(", max(" + c + ") as " + c) for c in masterDF.columns]
>>> string = "".join(string)
>>> string
', max(id) as id, max(value) as value, max(name) as name, max(value2) as value2'