如何找出最大连续多年使用Scala的每个ID /星火

问题描述:

我有一定的ID与各自多年的经营中的每一行:如何找出最大连续多年使用Scala的每个ID /星火

例子:

ID YEAR 

A1 1999 
A2 2000 
A1 2000 
B1 1998 
A1 2002 

现在,我需要确定的连续多年用于每个ID 结果号,

A1 : 2 because[1999, 2000 ] 

等,

如果你想要一个Spark解决方案,我会选择一个DataFrame。它会混乱,但它是一个有趣的问题:

val testDf = Seq(
    ("A1", 1999), 
    ("A2", 2000), 
    ("A1", 2000), 
    ("A1", 1998), 
    ("A1", 2002), 
    ("B1", 1998) 
).toDF("ID", "YEAR") 

然后,我会进行自连接(第两,实际上):

val selfJoined = testDf.orderBy($"YEAR").join(
    testDf.orderBy($"YEAR").toDF("R_ID", "R_YEAR"), 
    $"R_ID" === $"ID" && $"YEAR" === ($"R_YEAR" - 1), 
    "full_outer" 
).filter($"ID".isNull || $"R_ID".isNull) 

selfJoined.show 
+----+----+----+------+ 
| ID|YEAR|R_ID|R_YEAR| 
+----+----+----+------+ 
|null|null| A2| 2000| 
| A2|2000|null| null| 
|null|null| B1| 1998| 
| B1|1998|null| null| 
|null|null| A1| 1998| 
| A1|2000|null| null| 
|null|null| A1| 2002| 
| A1|2002|null| null| 
+----+----+----+------+ 

正如你可以从上面看到,我们现在有连续几年的开始和结束日期。 R_YEAR,当不是null时,包含连续年份的“运行”开始。接下来的一行,YEAR是该年的结束。如果我更熟练地使用Window功能,我可能会使用lag将记录缝合在一起,但我并不这样做。我会做另一个自连接,然后groupBy,然后一些数学在select,然后又groupBy

selfJoined.filter($"ID".isNull).as("a").join(
    selfJoined.filter($"R_ID".isNull).as("b"), 
    $"a.R_ID" === $"b.ID" && $"a.R_YEAR" <= $"b.YEAR" 
).groupBy($"a.R_ID", $"a.R_YEAR").agg(min($"b.YEAR") as "last_YEAR") 
.select($"R_ID" as "ID", $"last_YEAR" - $"R_YEAR" + 1 as "inarow") 
.groupBy($"ID").agg(max($"inarow") as "MAX").show 
+---+---+ 
| ID|MAX| 
+---+---+ 
| B1| 1| 
| A1| 3| 
| A2| 1| 
+---+---+ 

Wheee!

+0

感谢您的时间和步骤的详细解释,真的很感谢解决方案!但是,我期望的输出应该有A1 - > 2,因为A1有1999,2000,2002(这里2002年不是连续年)。 – Mithunram

+0

看看我的数据,它是不同的。为了更好的情况,我又增加了A1的一年。 –

+0

大卫格里芬你可以解释逻辑部分,多个自连接和它背后的算术。厌倦了我的最好把握它看起来乏味,并想确认我的解释是否正确。 – Mithunram

我会尝试的东西沿着这些路线:

scala> case class DataRow(id: String, year: Int) 
defined class DataRow 
scala> val data = Seq(
      DataRow("A1", 1999), 
      DataRow("A2", 2000), 
      DataRow("A1", 2000), 
      DataRow("B1", 1998), 
      DataRow("A1", 2002) 
     ) 
data: Seq[DataRow] = List(DataRow("A1", 1999), DataRow("A2", 2000), DataRow("A1", 2000), DataRow("B1", 1998), DataRow("A1", 2002)) 
scala> data.groupBy(_.id).mapValues { rows => 
      val years = rows.map(_.year) 
      val firstYear = years.head 
      years.zipWithIndex.takeWhile { case (y, i) => y == firstYear + i }.size 
     } 
res1: Map[String, Int] = Map("B1" -> 1, "A2" -> 1, "A1" -> 2) 

这个计算的连续多年的每个ID的最大数,假设它看到的第一年,是罢工的最早日期。在val years行插入.sorted是不是这种情况。

+0

那不使用'Spark'。 –

+0

哦对,我完全不在话题上,对不起! –

如果你不想与星火SQL打扰(在我看来,这是矫枉过正的任务),你可以简单地使用groupByKey(而每个ID可能年数是合理的)

val rdd = sc.parallelize(Seq(
    ("A1", 1999), 
    ("A2", 2000), 
    ("A1", 2000), 
    ("A1", 1998), 
    ("A1", 2002), 
    ("B1", 1998) 
)) 

def findMaxRange(l: Iterable[Int]) = { 
    val ranges = mutable.ArrayBuffer[Int](1) 
    l.toSeq.sorted.distinct.sliding(2).foreach { case y1 :: tail => 
    if (tail.nonEmpty) { 
     val y2 = tail.head 
     if (y2 - y1 == 1) ranges(ranges.size - 1) += 1 
     else ranges += 1 
    } 
    } 
    ranges.max 
} 

rdd1.groupByKey.map(r => (r._1, findMaxRange(r._2))).collect() 

res7: Array[(String, Int)] = Array((A1,3), (A2,1), (B1,1))