分裂火花数据帧和基于一个列值
问题描述:
计算平均我有两个dataframes: 第一数据帧classRecord
有10个不同的条目像以下:分裂火花数据帧和基于一个列值
Class, Calculation
first, Average
Second, Sum
Third, Average
第二数据帧studentRecord
具有围绕50K条目像以下:
Name, height, Camp, Class
Shae, 152, yellow, first
Joe, 140, yellow, first
Mike, 149, white, first
Anne, 142, red, first
Tim, 154, red, Second
Jake, 153, white, Second
Sherley, 153, white, Second
从第二个数据框中,根据类的类型,我想分别基于阵营进行高度计算(对于第一类:平均值,第二类:总和等)(如果类是fir st,平均黄色,白色等)。 我尝试以下操作:
//function to calculate average
def averageOnName(splitFrame : org.apache.spark.sql.DataFrame) : Array[(String, Double)] = {
val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd
var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1/y._2).collect
return avg_by_key
}
//required schema for further modifications
val schema = StructType(
StructField("name", StringType, false) ::
StructField("avg", DoubleType, false) :: Nil)
// for each loop on each class type
classRecord.rdd.foreach{
//filter students based on camps
var campYellow =studentRecord.filter($"Camp" === "yellow")
var campWhite =studentRecord.filter($"Camp" === "white")
var campRed =studentRecord.filter($"Camp" === "red")
// since I know that calculation for first class is average, so representing calculation only for class first
val avgcampYellow = averageOnName(campYellow)
val avgcampWhite = averageOnName(campWhite)
val avgcampRed = averageOnName(campRed)
// union of all
val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfYellow = sqlContext.createDataFrame(rddYellow, schema)
//union with yellow camp data
val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfWhite = sqlContext.createDataFrame(rddWhite, schema)
var dfYellWhite = dfYellow.union(dfWhite)
//union with yellow,white camp data
val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfRed = sqlContext.createDataFrame(rddRed, schema)
var dfYellWhiteRed = dfYellWhite .union(dfRed)
// other modifications and final result to hive
}
在这里,我挣扎:
1.hardcoding Yellow, red and white, there may be other camp type also.
2. Filtering same dataframe many times
3. Not able to figure out how to calculate differently according to class calculation type.
帮助表示赞赏。谢谢。
答
您可以简单地对Class/Camp的所有组合进行平均和求和计算,然后分别解析classRecord
数据帧并提取您需要的数据。您可以通过使用groupBy()
方法轻松完成此操作并汇总值。
使用您的示例数据帧:
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
studentRecord.show()
+-------+------+------+------+
| Name|height| Camp| Class|
+-------+------+------+------+
| Shae| 152|yellow| first|
| Joe| 140|yellow| first|
| Mike| 149| white| first|
| Anne| 142| red| first|
| Tim| 154| red|Second|
| Jake| 153| white|Second|
|Sherley| 153| white|Second|
+-------+------+------+------+
val df = studentRecord.groupBy("Class", "Camp").agg(
sum($"height").as("Sum"),
avg($"height").as("Average"),
collect_list($"Name").as("Names"))
df.show()
+------+------+---+-------+---------------+
| Class| Camp|Sum|Average| Names|
+------+------+---+-------+---------------+
| first| white|149| 149.0| [Mike]|
| first| red|142| 142.0| [Anne]|
|Second| red|154| 154.0| [Tim]|
|Second| white|306| 153.0|[Jake, Sherley]|
| first|yellow|292| 146.0| [Shae, Joe]|
+------+------+---+-------+---------------+
这样做后,你可以简单地检查你的第一classRecord
数据帧之后,行你需要的。它可以看起来像什么样的例子,可以根据您的实际需求进行更改:
// Collects the dataframe as an Array[(String, String)]
val classRecs = classRecord.collect().map{case Row(clas: String, calc: String) => (clas, calc)}
for (classRec <- classRecs){
val clas = classRec._1
val calc = classRec._2
// Matches which calculation you want to do
val df2 = calc match {
case "Average" => df.filter($"Class" === clas).select("Class", "Camp", "Average")
case "Sum" => df.filter($"Class" === clas).select("Class", "Camp", "Sum")
}
// Do something with df2
}
希望它有帮助!
如果我理解正确,您希望高度的平均值或总和取决于Camp和Class?怎么样计算所有camp/class的组合,把它放在一个数据框中,然后分别读取'classRecord'df? – Shaido