使用scala数据帧中的最小值和最大值来寻找正常值
我有一个39列的数据帧,每列都有不同的正常范围。 通过使用正常范围,我想找出正常值,并把0否则把1.使用scala数据帧中的最小值和最大值来寻找正常值
这是我所做的,但我想为39列做。
val test :(Double => Double) = (value: Double) =>
{
if(value >= 45 && value <= 62) 0
else 1
}
但我不明白如何使用不同的值到每一列。
用于离: 我有这样的DF
+--------------------+---------+-------------------------+---------+
|a |b |c |d |
+--------------------+---------+-------------------------+---------+
| 207.0| 40.0| 193.0| 39.0|
| 98.0| 17.0| 193.0| 15.0|
| 207.0| 13.0| 193.0| 17.0|
| 207.0| 26.0| 193.0| 23.0|
| 207.0| 35.0| 193.0| 24.0|
| 207.0| 91.0| 193.0| 45.0|
| 207.0| 40.0| 193.0| 37.0|
| 207.0| 23.0| 193.0| 23.0|
| 207.0| 26.0| 193.0| 22.0|
| 207.0| 39.0| 193.0| 34.0|
我想导致像下面使用范围
col range
a 50-160
b 1-21
c 5-40
d 7-27
如果在范围内的值,则0否则为1
+--------------------+---------+-------------------------+---------+
|a |b |c |d |
+--------------------+---------+-------------------------+---------+
| 1.0| 1.0| 1.0| 1.0|
| 0.0| 0.0| 1.0| 0.0|
| 1.0| 0.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 1.0|
| 1.0| 1.0| 1.0| 1.0|
| 1.0| 1.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 1.0|
I want to do this for 39 columns.(scala/pyspark preferred)
您应该定义一个用户定义的函数(UDF),然后将其应用于您的每个列NT。
这是关于Scala的用户定义函数的文档。它非常完整,我鼓励你阅读它。
下面是摘录,以帮助您快速了解,我想在这里去:
scala> df.withColumn("upper", upper('text)).show
+---+-----+-----+
| id| text|upper|
+---+-----+-----+
| 0|hello|HELLO|
| 1|world|WORLD|
+---+-----+-----+
// You could have also defined the UDF this way
val upperUDF = udf { s: String => s.toUpperCase }
// or even this way
val upperUDF = udf[String, String](_.toUpperCase)
scala> df.withColumn("upper", upperUDF('text)).show
+---+-----+-----+
| id| text|upper|
+---+-----+-----+
| 0|hello|HELLO|
| 1|world|WORLD|
+---+-----+-----+
你看你的功能适用于整列,其结果将是一个新的栏目。因此,你的函数应该是这样的:
def isInRange(e: Number, min: Number, max: Number): Boolean = (e < max && e > min)
然后,对于给定的minValue(最小值)和maxValue(最大值),你要做的就是:
myDF.withColumn("isInRange_a", udf(x => isInRange(x, minValue, maxValue).apply(myDF("a")))
你现在能做些什么,在给定的名单套用/数据帧包含(varName中,包括maxValue,minValue(最小值))是:
任一个地图/减少操作,在这里将计算为每列不管它是在给定范围或没有。然后,你会加入一个给定的密钥(我不知道你的问题很多,所以我不能帮你在这里)。这个解决方案可以工作,但随着数据的增长,效率会变得非常低,因为您可能拥有几个相似的键。
无论是递归的操作,其目的是执行类似:
myDF.whithColumn(...).withColumn(...).withColumn(...)
等
第二种解决方案是一个我会因为这会看起来很像键的选择。
你是怎么做到的?
def applyMyUDFRecursively(myDF: DataFrame, List[MyRange]: rangesList): DataFrame =
if (rangesList == null || rangesList.isEmpty) myDF
else applyMyUDFRecursively(
myDF.withColumn(myDF.withColumn("isInRange_" + rangesList.head._0, udf(x => isInRange(x, rangesList.head._1, rangesList.head._2).apply(myDF(rangesList.head._0))), rangesList.tail)
现在,您已应用于所有列,但列可能太多。做这样的事情:
resultDF.drop(rangesList.map(case x => x._0).collect: _*)
公告类型归属到降功能适用于所有的元素时,地图/收集
与VAL MyRange获得的名单内= SEQ(varName中:字符串,最小:数字, max:Number)
例如:为您的数据帧,它应该像这样(简化版本):
def recApply(myDF: DataFrame, cols: List[String]): DataFrame =
if (cols == null || cols.isEmpty) myDF
else recApply(myDF.withColumn(myDF.withColumn("isInRange_" + col.head, udf(x => test(x).apply(myDF(cols.head))), cols.tail)
然后,应用此功能,您的DF和存储您的结果:
val my_result = recApply(myDF, myDF.cols)
请让我知道,如果事情还不清楚,我希望我给你钥匙,让你现在自己处理这个问题,并毫不犹豫地将问题标记为答案,如果这适合你 – belka
我很感谢你的回答这是最详细的答案我曾经得到但仍然没有工作所有39栏可以只显示上面的示例数据,它会更有帮助 –
也,你可以看到我的udf以上 –