Scala中reduceBykey(func: (V, V) => V)时遇到的漏数据问题

Scala学习中对reduceBykey(func: (V, V) => V)的理解

数据:

编号 性别 身高
1 F 170
2 M 178
3 M 174

要求:分别计算各个性别的最高身高和最低身高
代码如下:

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Peopleinfo")
    val sc = new SparkContext(conf)
    val lineRDD = sc.textFile("file:///d:/peopleinfo.txt")
    val man:LongAccumulator = sc.longAccumulator("man")
    val female:LongAccumulator = sc.longAccumulator("female")
    val infoRDD:RDD[(String,(Int,Int))] = lineRDD.map(t => {
      val fields = t.split("\\s+")
      (fields(1), (fields(2).toInt,fields(2).toInt))
    })
    infoRDD.foreach{case (sex,(v1,v2))=>println(sex+"-------"+v1)}
    val retRDD:RDD[(String,(Int,Int))] = infoRDD.reduceByKey((v1, v2) => {
      var max=v1._1
      var min=v1._2
      if (v2._1 > v1._1)
        max=v2._1
      if (v2._1 < v1._2)
        min=v2._1
      (max,min)
    })
    retRDD.foreach{case (sex,(max,min)) => println(s"$sex :(max:$max min:$min)")}
    sc.stop()
  }

运行结果:
Scala中reduceBykey(func: (V, V) => V)时遇到的漏数据问题
结果最小值计算错误,之前对reduceByKey的理解是:(v1,v2)=>v3,在key值相同的values里取出两个值作为v1和v2,v1与v2进行聚合操作,得到的v3会作为下一次操作的v1值,然后再在values取一个值作v2,如此重复。。。
然后试着将local[*]改为了local[1]

val conf = new SparkConf().setMaster("local[1]").setAppName("Peopleinfo")

计算结果正确:
Scala中reduceBykey(func: (V, V) => V)时遇到的漏数据问题
突然想起reduceByKey是通过combineByKeyWithClassTag来实现的,实现过程为:
1,createCombiner: V => C,初始化Combiner
2,mergeValue: (C, V) => C,区内聚合
3,mergeCombiners: (C, C) => C,区与区之间聚合

最后回去分析代码发现,自己在写代码的时候,将每一次聚合时的v2值都当作了未聚合过的值来用,而忽略了最后的区与区之间聚和时v2的值会作为这个区的(max,min)来参与计算,所以导致结果中漏掉了最小值。

修改代码,对v2._2添加一次判断:

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Peopleinfo")
    val sc = new SparkContext(conf)
    val lineRDD = sc.textFile("file:///d:/peopleinfo.txt")
    val man:LongAccumulator = sc.longAccumulator("man")
    val female:LongAccumulator = sc.longAccumulator("female")
    val infoRDD:RDD[(String,(Int,Int))] = lineRDD.map(t => {
      val fields = t.split("\\s+")
      (fields(1), (fields(2).toInt,fields(2).toInt))
    })
    infoRDD.foreach{case (sex,(v1,v2))=>println(sex+"-------"+v1)}
    val retRDD:RDD[(String,(Int,Int))] = infoRDD.reduceByKey((v1, v2) => {
      var max=v1._1
      var min=v1._2
      if (v2._1 > v1._1)
        max=v2._1
      if (v2._1 < v1._2)
        min=v2._1
	  if (v2._1 < v1._2)
        min=v2._1
      (max,min)
    })
    retRDD.foreach{case (sex,(max,min)) => println(s"$sex :(max:$max min:$min)")}
    sc.stop()
  }

总结:reduceByKey的执行过程是:先初始化combiner,再分区内聚合,最后分区间聚合