如何基于火花阶现有列中添加新列

问题描述:

如何基于火花阶现有列中添加新列

我做打造阿帕奇火花使用Mllib ALS的建议,与输出

user | product | rating 
    1 | 20 | 0.002 
    1 | 30 | 0.001 
    1 | 10 | 0.003 
    2 | 20 | 0.002 
    2 | 30 | 0.001 
    2 | 10 | 0.003 

,但我需要改变的数据结构基础按类别排序,如下:

user | product | rating | number_rangking 
    1 | 10 | 0.003 | 1 
    1 | 20 | 0.002 | 2 
    1 | 30 | 0.001 | 3 
    2 | 10 | 0.002 | 1 
    2 | 20 | 0.001 | 2 
    2 | 30 | 0.003 | 3 

我该怎么做?也许任何一个都可以给我一个线索...

THX

所有你需要的是取决于细节的窗口功能,您选择使用rankrowNumber

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.rank 

val w = Window.partitionBy($"user").orderBy($"rating".desc) 

df.select($"*", rank.over(w).alias("number_rangking")).show 
// +----+-------+------+---------------+ 
// |user|product|rating|number_rangking| 
// +----+-------+------+---------------+ 
// | 1|  10| 0.003|    1| 
// | 1|  20| 0.002|    2| 
// | 1|  30| 0.001|    3| 
// | 2|  10| 0.003|    1| 
// | 2|  20| 0.002|    2| 
// | 2|  30| 0.001|    3| 
// +----+-------+------+---------------+ 

使用普通RDD可以groupByKey,本地处理和flatMap

rdd 
    // Convert to PairRDD 
    .map{case (user, product, rating) => (user, (product, rating))} 
    .groupByKey 
    .flatMap{case (user, vals) => vals.toArray 
    .sortBy(-_._2) // Sort by rating 
    .zipWithIndex // Add index 
    // Yield final values 
    .map{case ((product, rating), idx) => (user, product, rating, idx + 1)}}