如何将pyspark中的窗口函数应用于需要聚集内聚合的分组数据?
问题描述:
我有一个复杂的winodwing操作,我需要在pyspark帮助。如何将pyspark中的窗口函数应用于需要聚集内聚合的分组数据?
我有src
和dest
分组一些数据,我需要为每个组执行以下操作: - 选择socket2
仅与数量排它不出现在socket1
(该组中的所有行) - 应用该过滤标准之后,总和量在amounts
字段
amounts src dest socket1 socket2
10 1 2 A B
11 1 2 B C
12 1 2 C D
510 1 2 C D
550 1 2 B C
500 1 2 A B
80 1 3 A B
我想要聚合它以下述方式:
512 + 10 = 522,和80是用于SR的唯一记录C = 1和dest = 3
amounts src dest
522 1 2
80 1 3
答
您可以将数据帧分成2个dataframes一个与socket1
和另一个与socket2
然后用leftanti
加盟而不是过滤(适用于spark >= 2.0
)。
首先让我们创建数据框:
df = spark.createDataFrame(
sc.parallelize([
[10,1,2,"A","B"],
[11,1,2,"B","C"],
[12,1,2,"C","D"],
[510,1,2,"C","D"],
[550,1,2,"B","C"],
[500,1,2,"A","B"],
[80,1,3,"A","B"]
]),
["amounts","src","dest","socket1","socket2"]
)
现在拆分数据框:
星火> = 2.0
df1 = df.withColumnRenamed("socket1", "socket").drop("socket2")
df2 = df.withColumnRenamed("socket2", "socket").drop("socket1")
res = df2.join(df1, ["src", "dest", "socket"], "leftanti")
星火1.6
df1 = df.withColumnRenamed("socket1", "socket").drop("socket2").withColumnRenamed("amounts", "amounts1")
df2 = df.withColumnRenamed("socket2", "socket").drop("socket1")
res = df2.join(df1.alias("df1"), ["src", "dest", "socket"], "left").filter("amounts1 IS NULL").drop("amounts1")
最后汇总:
import pyspark.sql.functions as psf
res.groupBy("src", "dest").agg(
psf.sum("amounts").alias("amounts")
).show()
+---+----+-------+
|src|dest|amounts|
+---+----+-------+
| 1| 3| 80|
| 1| 2| 522|
+---+----+-------+
你已经尝试什么吗?你能分享你的实验吗? – Mariusz
是的,一秒钟请 – guimption