用Scala-Spark中的平均值填充Nan
问题描述:
我有一个带有6列的RDD
,其中最后5列可能包含NaN。我的目的是用非Nan的最后5个值的其余值的平均值替换NaN。例如,具有这种输入:用Scala-Spark中的平均值填充Nan
1, 2, 3, 4, 5, 6
2, 2, 2, NaN, 4, 0
3, NaN, NaN, NaN, 6, 0
4, NaN, NaN, 4, 4, 0
输出应该是:
1, 2, 3, 4, 5, 6
2, 2, 2, 2, 4, 0
3, 3, 3, 3, 6, 0
4, 3, 3, 4, 4, 0
我知道如何填补这些NaN的配列改造RDD
到DataFrame
的平均值:
var aux1 = df.select(df.columns.map(c => mean(col(c))) :_*)
var aux2 = df.na.fill(/*get values of aux1*/)
我的问题是,你如何做这个操作,而不是用平均列填充NaN,用平均值填充一个子集行的p?
答
你可以通过定义一个函数来获取平均做到这一点,和其他功能连续补空。
由于DF您呈现:
val df = sc.parallelize(List((Some(1),Some(2),Some(3),Some(4),Some(5),Some(6)),(Some(2),Some(2),Some(2),None,Some(4),Some(0)),(Some(3),None,None,None,Some(6),Some(0)),(Some(4),None,None,Some(4),Some(4),Some(0)))).toDF("a","b","c","d","e","f")
我们需要一个函数来获取行的意思是:
import org.apache.spark.sql.Row
def rowMean(row: Row): Int = {
val nonNulls = (0 until row.length).map(i => (!row.isNullAt(i), row.getAs[Int](i))).filter(_._1).map(_._2).toList
nonNulls.sum/nonNulls.length
}
,另一个在行填充空值:
def rowFillNulls(row: Row, fill: Int): Row = {
Row((0 until row.length).map(i => if (row.isNullAt(i)) fill else row.getAs[Int](i)) : _*)
}
现在,我们可以首先计算每一行平均:
val rowWithMean = df.map(row => (row,rowMean(row)))
然后往里面:
val result = sqlContext.createDataFrame(rowWithMean.map{case (row,mean) => rowFillNulls(row,mean)}, df.schema)
之前和之后的最后查看...
df.show
+---+----+----+----+---+---+
| a| b| c| d| e| f|
+---+----+----+----+---+---+
| 1| 2| 3| 4| 5| 6|
| 2| 2| 2|null| 4| 0|
| 3|null|null|null| 6| 0|
| 4|null|null| 4| 4| 0|
+---+----+----+----+---+---+
result.show
+---+---+---+---+---+---+
| a| b| c| d| e| f|
+---+---+---+---+---+---+
| 1| 2| 3| 4| 5| 6|
| 2| 2| 2| 2| 4| 0|
| 3| 3| 3| 3| 6| 0|
| 4| 3| 3| 4| 4| 0|
+---+---+---+---+---+---+
这会为任何宽度DF工作为int的列。您可以轻松地此更新到其它数据类型,甚至非数字(提示,检查DF模式!)
答
嗯,这是一个有趣的小问题 - 我会后我的解决办法,但我一定会看,看是否有人想出了这样做:)
首先一个更好的方式我会介绍一些udf
S:
val avg = udf((values: Seq[Integer]) => {
val notNullValues = values.filter(_ != null).map(_.toInt)
notNullValues.sum/notNullValues.length
})
val replaceNullWithAvg = udf((x: Integer, avg: Integer) => if(x == null) avg else x)
,我会再申请到DataFrame
这样的:
dataframe
.withColumn("avg", avg(array(df.columns.tail.map(s => df.col(s)):_*)))
.select('col1, replaceNullWithAvg('col2, 'avg) as "col2", replaceNullWithAvg('col3, 'avg) as "col3", replaceNullWithAvg('col4, 'avg) as "col4", replaceNullWithAvg('col5, 'avg) as "col5", replaceNullWithAvg('col6, 'avg) as "col6")
这将让你什么ÿ OU正在寻找,但无疑不是最复杂的代码,我曾经放在一起......
答
一堆进口:
import org.apache.spark.sql.functions.{col, isnan, isnull, round, when}
import org.apache.spark.sql.Column
一些辅助函数:
def nullOrNan(c: Column) = isnan(c) || isnull(c)
def rowMean(cols: Column*): Column = {
val sum = cols
.map(c => when(nullOrNan(c), lit(0.0)).otherwise(c))
.fold(lit(0.0))(_ + _)
val count = cols
.map(c => when(nullOrNan(c), lit(0.0)).otherwise(lit(1.0)))
.fold(lit(0.0))(_ + _)
sum/count
}
A液:
val mean = round(
rowMean(df.columns.tail.map(col): _*)
).cast("int").alias("mean")
val exprs = df.columns.tail.map(
c => when(nullOrNan(col(c)), mean).otherwise(col(c)).alias(c)
)
val filled = df.select(col(df.columns(0)) +: exprs: _*)
所以,我对我的回答改进,使得'平均'-udf能够处理任意数量的列。我尊重你已经接受了另一个答案,但我想指出,我的解决方案不需要你在'rdds'和'dataframes'之间来回切换,而是直接在'dataframe'上运行:) –