星火SQL超时

问题描述:

我试图运行一个独立的Spark集群星火SQL超时

select a.name, b.name, s.score 
from score s 
inner join A a on a.id = s.a_id 
inner join B b on b.id = s.b_id 
where pmod(a.id, 3) != 3 and pmod(b.id, 3) != 0 

表尺寸如下一个相对简单的星火SQL命令

A: 25,000 
B: 2,500,000 
score: 25,000,000 

因此,从这个我期望得到2500万行的结果。我想用Spark SQL运行这个查询,然后处理每一行。下面是相关火花代码

val sqlContext = new HiveContext(sc) 
val sql = "<above SQL>" 
sqlContext.sql(sql).first 

此命令运行正常,当表分数的大小为20万,但现在不运行。以下是相关的日志

14/12/04 16:35:14 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems. 
14/12/04 16:35:43 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems. 
14/12/04 16:36:24 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems. 
14/12/04 16:37:11 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems. 
14/12/04 16:38:13 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems. 
14/12/04 16:39:19 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems. 
14/12/04 16:39:48 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems. 
14/12/04 16:40:08 WARN MemoryStore: Not enough space to store block broadcast_12 in memory! Free memory is 1938057068 bytes. 
14/12/04 16:40:08 WARN MemoryStore: Persisting block broadcast_12 to disk instead. 
java.util.concurrent.TimeoutException: Futures timed out after [5 minutes] 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
    at scala.concurrent.Await$.result(package.scala:107) 
    at org.apache.spark.sql.execution.BroadcastHashJoin.execute(joins.scala:431) 
    at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:42) 
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:111) 
    at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) 
    at org.apache.spark.sql.SchemaRDD.take(SchemaRDD.scala:440) 
    at org.apache.spark.sql.SchemaRDD.take(SchemaRDD.scala:103) 
    at org.apache.spark.rdd.RDD.first(RDD.scala:1092) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:20) 
    at $iwC$$iwC$$iwC.<init>(<console>:25) 
    at $iwC$$iwC.<init>(<console>:27) 
    at $iwC.<init>(<console>:29) 
    at <init>(<console>:31) 
    at .<init>(<console>:35) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) 
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) 
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) 
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) 
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) 
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) 
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) 
    at org.apache.spark.repl.Main$.main(Main.scala:31) 
    at org.apache.spark.repl.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

我最初的想法是增加此超时,但这并不无需重新编译源作为表演here看起来可能。在父目录中,我也看到了一些不同的连接,但我不确定如何让spark使用其他类型的连接。

我也试图通过增加spark.executor.memory到10g来解决我的第一个关于坚持磁盘的警告,但那并没有解决问题。

有谁知道我可以如何运行此查询?

+0

看到类似的问题 - 你有没有找到这个解决方案? – NightWolf 2014-12-23 04:00:23

也许您正在运行广播连接的超时。出于某种原因,它是一个未公开的配置选项spark.sql.broadcastTimeout(默认为300s)。

所以你可以尝试增加这个(为我们工作),或者让Spark不做广播加入(尽管这是建议将小桌子加入大桌子,参见https://docs.cloud.databricks.com/docs/latest/databricks_guide/06%20Spark%20SQL%20%26%20DataFrames/05%20BroadcastHashJoin%20-%20scala.html)。