与Apache火花flattern各组的2个第一行与Java

问题描述:

给予以下的输入表:与Apache火花flattern各组的2个第一行与Java

+----+------------+----------+ 
| id | shop  | purchases| 
+----+------------+----------+ 
| 1 | 01   |  20 | 
| 1 | 02   |  31 | 
| 2 | 03   |  5 | 
| 1 | 03   |  3 | 
+----+------------+----------+ 

我想,通过ID分组和基于所述购买,得到第一2米顶部的商店作为遵循:

+----+-------+------+ 
| id | top_1 | top_2| 
+----+-------+------+ 
| 1 | 02 | 01 | 
| 2 | 03 |  | 
+----+-------+------+ 

我使用Apache 2.0.1星火和第一表是其他查询的结果,并加入这些都对数据集。我可以用传统的java遍历数据集来做到这一点,但我希望有另一种使用数据集功能的方法。 我第一次尝试以下操作:

//dataset is already ordered by id, purchases desc 
... 
Dataset<Row> ds = dataset.repartition(new Column("id")); 
ds.foreachPartition(new ForeachPartitionFunction<Row>() { 
     @Override 
     public void call(Iterator<Row> itrtr) throws Exception { 
      int counter = 0; 
      while (itrtr.hasNext()) { 
       Row row = itrtr.next(); 
       if(counter < 2) 
       //save it into another Dataset 
       counter ++; 
      } 
     } 
    }); 

但后来我是在如何将其保存到另一个数据集丢失。最后,我的目标是将结果保存到MySQL表中。

使用窗口功能,并转动你可以定义一个窗口:

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.{col, first, row_number} 

val w = Window.partitionBy(col("id")).orderBy(col("purchases").desc) 

添加row_number和过滤器上两行:

val dataset = Seq(
    (1, "01", 20), (1, "02", 31), (2, "03", 5), (1, "03", 3) 
).toDF("id", "shop", "purchases") 

val topTwo = dataset.withColumn("top", row_number.over(w)).where(col("top") <= 2) 

和枢轴:

topTwo.groupBy(col("id")).pivot("top", Seq(1, 2)).agg(first("shop")) 

与结果是:

+---+---+----+ 
| id| 1| 2| 
+---+---+----+ 
| 1| 02| 01| 
| 2| 03|null| 
+---+---+----+ 

我将离开语法转换为Java作为海报练习(不包括import static其余功能应该接近完全相同)。