Spark的延迟执行再理解
首先,大家可以猜测下如下代码的执行结果,是否会报错?
val df = Seq((0, "ben", "student"),(1, "jim", "teacher")).toDF("id", "name", "job")
val df2 = df.select("id", "name")
val df3 = df2.filter("job = 'teacher'") // 是否会报错?
df3.show
val df4 = df.select("id", "name").filter("job = 'teacher'") // 是否会报错?
df4.show
val df5 = df3.filter("job = 'teacher'").select("id", "name") // 是否会报错?
df5.show
比较直观的反应是df3
可能会报错,因为df2
中已经不包含job
列了,此时会报列不存在的错误。
但再看下df4
的生成语句,隐隐感觉应该是没有问题的。
再看下df5
的生成语句,顿时有种豁免开朗的感觉,这应该没有问题。
以下是语句实际执行情况:
再回过头来看上面三种写法之间的差别,如果是按正常的顺序执行来看,第一种写法会有问题。但spark的计算都是延迟执行的,只有action
操作才会触发生成实际的执行任务。这样一看,上面三种写法其实是完全一样的。
接下来再看下下面这段代码的执行情况:
val df = Seq((0, "ben", "student"),(1, "jim", "teacher")).toDF("id", "name", "job")
val df2 = df.select("id", "name").select("job") // 是否会报错?
val df3 = df.select("id", "name").groupBy("job") // 是否会报错?
为什么df2
会报错,两个select语句为什么不是最终被优化成一个并集呢?
为什么df3
会报错,groupBy
会为什么是基于select出来的结果,而不是针对原始数据df
进行呢?
从执行结果来看,两种执行方式最终实现时都采用的子查询的方式,具体与spark sql的解析实现有关。
为什么会对这些语句的执行情况有疑惑,最根本的问题还是思路没有从即时执行转换到延迟执行上来。