如何将pyspark中的窗口函数应用于需要聚集内聚合的分组数据?

问题描述:

我有一个复杂的winodwing操作,我需要在pyspark帮助。如何将pyspark中的窗口函数应用于需要聚集内聚合的分组数据?

我有srcdest分组一些数据,我需要为每个组执行以下操作: - 选择socket2仅与数量排它出现在socket1(该组中的所有行) - 应用该过滤标准之后,总和量在amounts字段

amounts  src dest socket1 socket2 
10   1  2   A  B 
11   1  2   B  C 
12   1  2   C  D 
510   1  2   C  D 
550   1  2   B  C 
500   1  2   A  B 
80   1   3   A  B 

我想要聚合它以下述方式:
512 + 10 = 522,和80是用于SR的唯一记录C = 1和dest = 3

amounts  src dest  
522   1  2  
80   1  3  

我借用这里的样本数据:How to write Pyspark UDAF on multiple columns?

+1

你已经尝试什么吗?你能分享你的实验吗? – Mariusz

+0

是的,一秒钟请 – guimption

您可以将数据帧分成2个dataframes一个与socket1和另一个与socket2然后用leftanti加盟而不是过滤(适用于spark >= 2.0)。

首先让我们创建数据框:

df = spark.createDataFrame(
    sc.parallelize([ 
     [10,1,2,"A","B"], 
     [11,1,2,"B","C"], 
     [12,1,2,"C","D"], 
     [510,1,2,"C","D"], 
     [550,1,2,"B","C"], 
     [500,1,2,"A","B"], 
     [80,1,3,"A","B"] 
    ]), 
    ["amounts","src","dest","socket1","socket2"] 
) 

现在拆分数据框:

星火> = 2.0

df1 = df.withColumnRenamed("socket1", "socket").drop("socket2") 
df2 = df.withColumnRenamed("socket2", "socket").drop("socket1") 
res = df2.join(df1, ["src", "dest", "socket"], "leftanti") 

星火1.6

df1 = df.withColumnRenamed("socket1", "socket").drop("socket2").withColumnRenamed("amounts", "amounts1") 
df2 = df.withColumnRenamed("socket2", "socket").drop("socket1") 
res = df2.join(df1.alias("df1"), ["src", "dest", "socket"], "left").filter("amounts1 IS NULL").drop("amounts1") 

最后汇总:

import pyspark.sql.functions as psf 
res.groupBy("src", "dest").agg(
    psf.sum("amounts").alias("amounts") 
).show() 

    +---+----+-------+ 
    |src|dest|amounts| 
    +---+----+-------+ 
    | 1| 3|  80| 
    | 1| 2| 522| 
    +---+----+-------+ 
+1

和... +1对于1k :) @Marie – Prem

+1

哈哈谢谢@Prem – MaFF