Spark Spark待处理批处理
问题描述:
我在运行Spark Streaming
应用程序,该应用程序从Kafka
(使用Direct Stream
方法)读取数据并将结果发布回Kafka
。应用程序的输入速率以及应用程序的吞吐量保持稳定大约一两个小时。之后,我开始在很长一段时间内看到在Active Batches
队列中保留的批次(持续30分钟+)。该Spark driver
日志表明以下两种类型的错误,而这些错误发生的时间与卡住批次的开始时间一致得好:Spark Spark待处理批处理
第一个错误类型
ERROR LiveListenerBus: Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
二错误键入
ERROR StreamingListenerBus: Listener StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key not found: 1501806558000 ms
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:75)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1279)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
但是,我不知道如何解释这些错误,尽管广泛的在线搜索,我合作没有找到与此相关的任何有用信息。
问题
- 什么这些错误是什么意思?它们是否表示资源限制(例如:CPU,内存等)?
- 解决这些错误的最佳方法是什么?
在此先感谢。
答
您的批次持续时间是不是少于实际批处理时间?默认批处理队列大小为1000,因此可以溢出Spark Spark批处理队列。
请考虑重新说明这一点。这看起来像一个平庸的评论,而不是真正回答问题的东西。 – GhostCat