Scala - 如何将我们在将GMM模型拟合到两个单独列中的数据时获得的概率列(向量列)分开?

问题描述:

我试图执行以下操作:Scala - 如何将我们在将GMM模型拟合到两个单独列中的数据时获得的概率列(向量列)分开?

+-----+-------------------------+----------+-------------------------------------------+ 
|label|features     |prediction|probability        | 
+-----+-------------------------+----------+-------------------------------------------+ 
|0.0 |(3,[],[])    |0   |[0.9999999999999979,2.093996169658831E-15] | 
|1.0 |(3,[0,1,2],[0.1,0.1,0.1])|0   |[0.999999999999999,9.891337521299582E-16] | 
|2.0 |(3,[0,1,2],[0.2,0.2,0.2])|0   |[0.9999999999999979,2.0939961696578572E-15]| 
|3.0 |(3,[0,1,2],[9.0,9.0,9.0])|1   |[2.093996169659668E-15,0.9999999999999979] | 
|4.0 |(3,[0,1,2],[9.1,9.1,9.1])|1   |[9.89133752128275E-16,0.999999999999999] | 
|5.0 |(3,[0,1,2],[9.2,9.2,9.2])|1   |[2.0939961696605603E-15,0.9999999999999979]| 
+-----+-------------------------+----------+-------------------------------------------+ 

转换上述数据帧有两列:prob1 & prob2 具有如存在于probability列中的相应值的每一列。

我发现了类似的问题 - 一个在pyspark中,另一个在scala中。我不知道如何翻译pyspark代码,并且在Scala代码中出现错误。

PySpark代码:

split1_udf = udf(lambda value: value[0].item(), FloatType()) 
split2_udf = udf(lambda value: value[1].item(), FloatType()) 

output2 = randomforestoutput.select(split1_udf('probability').alias('c1'), split2_udf('probability').alias('c2')) 

或将这些列追加到原始数据帧:

randomforestoutput.withColumn('c1', split1_udf('probability')).withColumn('c2', split2_udf('probability')) 

Scala代码:

import org.apache.spark.sql.functions.udf 

val getPOne = udf((v: org.apache.spark.mllib.linalg.Vector) => v(1)) 
model.transform(testDf).select(getPOne($"probability")) 

我碰到下面的错误,当我运行斯卡拉码:

scala> predictions.select(getPOne(col("probability"))).show(false) 
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(probability)' due to data type mismatch: argument 1 requires vector type, however, '`probability`' is of vector type.;; 
'Project [UDF(probability#39) AS UDF(probability)#135] 
+- Project [label#0, features#1, prediction#34, UDF(features#1) AS probability#39] 
    +- Project [label#0, features#1, UDF(features#1) AS prediction#34] 
     +- Relation[label#0,features#1] libsvm 

我目前使用Scala的2.11.11和Spark 2.1.1

我从你的问题不解的是,你正试图splitprobability列到两列prob1prob2。如果是这样的话,那么withColumn的功能array就可以解决你的问题。

predictions 
    .withColumn("prob1", $"probability"(0)) 
    .withColumn("prob2", $"probability"(1)) 
    .drop("probability") 

您可以找到more functions,可以帮助你在未来被应用到dataframes

编辑

我创建了一个临时dataframe,以配合您的column

val predictions = Seq(Array(1.0,2.0), Array(2.0939961696605603E-15,0.9999999999999979), Array(Double.NaN,Double.NaN)).toDF("probability") 
+--------------------------------------------+ 
|probability         | 
+--------------------------------------------+ 
|[1.0, 2.0]         | 
|[2.0939961696605603E-15, 0.9999999999999979]| 
|[NaN, NaN]         | 
+--------------------------------------------+ 

我应用了上述withColumns导致

+----------------------+------------------+ 
|prob1     |prob2    | 
+----------------------+------------------+ 
|1.0     |2.0    | 
|2.0939961696605603E-15|0.9999999999999979| 
|NaN     |NaN    | 
+----------------------+------------------+ 

架构不匹配,编辑

既然Vectorschema你的probability列与上面的解决方案arrayTypeschema不匹配,以上解决方案不适用于你的情况。请使用以下解决方案。

你必须创建udf功能和返回值如预期

val first = udf((v: Vector) => v.toArray(0)) 
    val second = udf((v: Vector) => v.toArray(1)) 
    predictions 
     .withColumn("prob1", first($"probability")) 
     .withColumn("prob2", second($"probability")) 
     .drop("probability") 

我希望你能得到期望的结果。

+0

我试过了你在评论中提到的代码。不幸的是,我得到以下错误 - withcolumn(“prob1”,$“probability”(0))。withcolumn(“prob2”,$“probability”(1))。drop(“probability” )' 'org.apache.spark.sql.AnalysisException:无法从概率#29提取值; at org.apache.spark.sql.catalyst.expressions.ExtractValue $ .apply(complexTypeExtractors.scala:73) at org.apache.spark.sql.catalyst.analysis.Analyzer $ ResolveReferences $$ anonfun $ apply $ 9 $$ anonfun $ applyOrElse $ 5.applyOrElse(Analyzer.scala:617)' – ankursg8

+0

有些概率列是否为空数组? –

+0

更新我的答案请看看:) –