星火SQL窗口功能向前看,复杂的功能

问题描述:

,我有以下数据:星火SQL窗口功能向前看,复杂的功能

+-----+----+-----+ 
|event|t |type | 
+-----+----+-----+ 
| A |20 | 1 | 
| A |40 | 1 | 
| B |10 | 1 | 
| B |20 | 1 | 
| B |120 | 1 | 
| B |140 | 1 | 
| B |320 | 1 | 
| B |340 | 1 | 
| B |360 | 7 | 
| B |380 | 1 | 
+-----+-----+----+ 

而且我想是这样的:

+-----+----+----+ 
|event|t |grp | 
+-----+----+----+ 
| A |20 |1 | 
| A |40 |1 | 
| B |10 |2 | 
| B |20 |2 | 
| B |120 |3 | 
| B |140 |3 | 
| B |320 |4 | 
| B |340 |4 | 
| B |380 |5 | 
+-----+----+----+ 

规则:
1)集团的所有值一起距离彼此至少50ms。 (列t)并属于同一事件。
2)当出现类型7的一行时也要剪切并删除该行。 (见最后一行)

的第一条规则我可以​​从this thread答案实现:

代码:

val windowSpec= Window.partitionBy("event").orderBy("t") 

val newSession = (coalesce(
    ($"t" - lag($"t", 1).over(windowSpec)), 
    lit(0) 
) > 50).cast("bigint") 

val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) 

我不得不说我无法弄清楚它是如何工作和唐不知道如何修改它,使规则2也可以运作... 希望有人能给我一些有用的提示。

我的尝试:

val newSession = (coalesce(
    ($"t" - lag($"t", 1).over(windowSpec)), 
    lit(0) 
) > 50 || lead($"type",1).over(windowSpec) =!= 7).cast("bigint") 

但只有发生错误:“必须遵循的方法;不能按照org.apache.spark.sql.Column val grp = (coalesce(

+1

if you wi ndow函数正在工作,然后只是应用类型== 7过滤出行之前应用您的窗口功能 –

+0

@RameshMaharjan我不认为这是可行的,因为如果你先删除它,你不能“切”了 –

+0

为什么'grp' for事件B在你预期的结果中从2开始? –

这应该做的伎俩:

val newSession = (coalesce(
    ($"t" - lag($"t", 1).over(win)), 
    lit(0) 
) > 50 
    or $"type"===7) // also start new group in this case 
.cast("bigint") 

df.withColumn("session", sum(newSession).over(win)) 
.where($"type"=!=7) // remove these rows 
.orderBy($"event",$"t") 
.show 

给出:

+-----+---+----+-------+ 
|event| t|type|session| 
+-----+---+----+-------+ 
| A| 20| 1|  0| 
| A| 40| 1|  0| 
| B| 10| 1|  0| 
| B| 20| 1|  0| 
| B|120| 1|  1| 
| B|140| 1|  1| 
| B|320| 1|  2| 
| B|340| 1|  2| 
| B|380| 1|  3| 
+-----+---+----+-------+ 
+0

谢谢你的工作很好。这很简单... – Boendal