火花数据帧的GroupBy和计算复杂聚集函数

问题描述:

使用火花数据帧,我需要通过使用下面 复杂的公式来计算百分比:火花数据帧的GroupBy和计算复杂聚集函数

组由“KEY”,并计算“re_pct”为(总和(SA)/总和(SA /(PCT/100)))* 100

例如,输入数据帧是

val values1 = List(List("01", "20000", "45.30"), List("01", "30000", "45.30")) 
    .map(row => (row(0), row(1), row(2))) 

val DS1 = values1.toDF("KEY", "SA", "PCT") 
DS1.show() 

+---+-----+-----+ 
|KEY| SA| PCT| 
+---+-----+-----+ 
| 01|20000|45.30| 
| 01|30000|45.30| 
+---+-----+-----+ 

预期结果:

+---+-----+--------------+ 
|KEY| re_pcnt   | 
+---+-----+--------------+ 
| 01| 45.30000038505 | 
+---+-----+--------------+ 

我试图计算如下

val result = DS1.groupBy("KEY").agg(((sum("SA").divide(
    sum(
    ("SA").divide(
     ("PCT").divide(100) 
    ) 
) 
)) * 100).as("re_pcnt")) 

但面对错误:(36,16)的值除以不是字符串( “SA”)的成员除({

任何建议上。实现上述逻辑?

您可以尝试导入spark.implicits._,然后用$指到一列。

val spark = SparkSession.builder.getOrCreate() 
import spark.implicits._ 

val result = DS1.groupBy("KEY") 
    .agg(((sum($"SA").divide(sum(($"SA").divide(($"PCT").divide(100))))) * 100) 
    .as("re_pcnt")) 

哪个会给你请求的输出。

如果您不想导入,则始终可以使用col()命令而不是$


它可能使用一个字符串作为输入到agg()函数与使用expr()。但是,输入字符串需要稍微更改一下。下面给出了完全相同的结果和以前一样,但使用字符串而不是:

val opr = "sum(SA)/(sum(SA/(PCT/100))) * 100" 
val df = DS1.groupBy("KEY").agg(expr(opr).as("re_pcnt")) 

注意.as("re_pcnt")需要是agg()方法中,它不可能是之外。

+0

yes ..its正在工作。非常感谢。 现在我想通过使用如下变量的计算 val formulaCal =“((sum(col(\”SA \“))。divide(sum((col(\”SA \“))。 (“(KEY”) .agg(formulaCal) .as(“re_pcnt()”)除以((col(\“PCT \”))divide(100)))))* 100“ “)) \t \t 但这不起作用 – raam

+0

@raam在答案中增加了附加信息,显示如何将表达式作为变量使用(表达式需要一些小的更改)。如果答案对您有帮助,请考虑[接受/加注](https://*.com/help/someone-answers)。 – Shaido

+1

...非常感谢..这正是我在找什么。 – raam

您的代码几乎完美地工作。你只需把“$”符号,以指定要传递一个列:

val result = DS1.groupBy($"KEY").agg(((sum($"SA").divide(
    sum(
    ($"SA").divide(
     ($"PCT").divide(100) 
    ) 
) 
)) * 100).as("re_pcnt")) 

下面是输出:

result.show() 
+---+-------+                 
|KEY|re_pcnt| 
+---+-------+ 
| 01| 45.3| 
+---+-------+