SnappyData:java.lang.OutOfMemoryError:GC开销超过限制
我对S3兽人数据的1.2GB,我试图做同样的下列内容:SnappyData:java.lang.OutOfMemoryError:GC开销超过限制
1)高速缓存活泼的群集上的数据[snappydata 0.9]
2)上的高速缓存的数据集
3)比较用火花的性能执行一个查询GROUPBY 2.0.0
我使用的是64 GB/8芯机和用于斯纳皮配置集群如下:
012现在$ cat locators
localhost
$cat leads
localhost -heap-size=4096m -spark.executor.cores=1
$cat servers
localhost -heap-size=6144m
localhost -heap-size=6144m
localhost -heap-size=6144m
localhost -heap-size=6144m
localhost -heap-size=6144m
localhost -heap-size=6144m
,我已经写了一个小python脚本,缓存从S3兽人数据并运行通过查询一个简单的基团,其是如下:
from pyspark.sql.snappy import SnappyContext
from pyspark import SparkContext,SparkConf
conf = SparkConf().setAppName('snappy_sample')
sc = SparkContext(conf=conf)
sqlContext = SnappyContext(sc)
sqlContext.sql("CREATE EXTERNAL TABLE if not exists my_schema.my_table using orc options(path 's3a://access_key:[email protected]_name/path')")
sqlContext.cacheTable("my_schema.my_table")
out = sqlContext.sql("select * from my_schema.my_table where (WeekId = '1') order by sum_viewcount desc limit 25")
out.collect()
上述脚本用执行下面的命令:
spark-submit --master local[*] snappy_sample.py
,我得到以下错误:
17/10/04 02:50:32 WARN memory.MemoryStore: Not enough space to cache rdd_2_5 in memory! (computed 21.2 MB so far)
17/10/04 02:50:32 WARN memory.MemoryStore: Not enough space to cache rdd_2_0 in memory! (computed 21.2 MB so far)
17/10/04 02:50:32 WARN storage.BlockManager: Persisting block rdd_2_5 to disk instead.
17/10/04 02:50:32 WARN storage.BlockManager: Persisting block rdd_2_0 to disk instead.
17/10/04 02:50:47 WARN storage.BlockManager: Putting block rdd_2_2 failed due to an exception
17/10/04 02:50:47 WARN storage.BlockManager: Block rdd_2_2 could not be removed as it was not found on disk or in memory
17/10/04 02:50:47 ERROR executor.Executor: Exception in task 2.0 in stage 0.0 (TID 2)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:96)
at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:97)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:135)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:134)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:134)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:98)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:232)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:331)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
at org.apache.spark.sql.execution.WholeStageCodegenRDD.compute(WholeStageCodegenExec.scala:496)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
17/10/04 02:50:47 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-2,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:96)
at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:97)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:135)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:134)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:134)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:98)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:232)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:331)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
at org.apache.spark.sql.execution.WholeStageCodegenRDD.compute(WholeStageCodegenExec.scala:496)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
17/10/04 02:50:48 INFO snappystore: VM is exiting - shutting down distributed system
除了上面的错误,我该如何检查数据是否被缓存在快速集群中?
1)首先,它看起来不像是用python脚本连接到SnappyData集群,而是以本地模式运行它。在这种情况下,python脚本启动的JVM会与预期的OOM一起失败。当使用Python连接到SnappyData集群中的“smart connector”模式:
spark-submit --master local[*] --conf snappydata.connection=locator:1527 snappy_sample.py
主机:端口以上是定位器主机和端口上节俭服务器正在运行(1527默认情况下)。其次,你有的例子只是使用Spark来缓存。如果你想使用SnappyData,负载成列的表:
from pyspark.sql.snappy import SnappySession
from pyspark import SparkContext,SparkConf
conf = SparkConf().setAppName('snappy_sample')
sc = SparkContext(conf=conf)
session = SnappySession(sc)
session.sql("CREATE EXTERNAL TABLE if not exists my_table using orc options(path 's3a://access_key:[email protected]_name/path')")
session.table("my_table").write.format("column").saveAsTable("my_column_table")
out = session.sql("select * from my_column_table where (WeekId = '1') order by sum_viewcount desc limit 25")
out.collect()
还要注意使用“SnappySession”,而不是背景,因为星火2.0.x的它被废弃与Spark缓存进行比较时,可以在单独的脚本中使用“cacheTable”并针对上游的Spark运行。请注意,“cacheTable”将缓慢执行缓存,这意味着第一个查询将执行实际的缓存,因此第一次查询运行将非常缓慢,但后续的应该更快。
3)更新到1.0版本,有很多改进,而不是使用0.9。在启动群集之前,您还需要将hadoop-aws-2.7.3和aws-java-sdk-1.7.4添加到conf/leads和conf/servers中的“-classpath”(或放入产品的jars目录中)。