星火电子病历“超出内存限制”可用于检查点/缓存工作
是我缓存的理解错了吗?在我所有的转换之后,得到的RDD非常小,比如1GB。它计算的数据非常大,大小约700 GB。星火电子病历“超出内存限制”可用于检查点/缓存工作
我要运行的逻辑阅读成千上万的相当大的文件,所有计算小得多导致RDD。每次迭代都会处理下一批400个文件,这些文件在读入时可能会炸毁大约700 GB的大小。传入的RDD以相同的方式进行处理(读取和转换),然后与积累的RDD合并。 I 缓存和检查点每次迭代后(也是非运行(阻塞= true)旧版本的结果rdd),以便我可以削减RDD谱系,这样我就不必重新计算结果出错,并节省执行人员的空间。 所以,我想,在任何时候我真的只需要1 GB *迭代+〜750GB的内存总容量为我的工作,而1.6 TB应该是绰绰有余的数量。但显然我误解了一些东西。
在每次迭代中,GC的时间越来越长。 Spark UI显示执行者位于红色区域(在GC上花费时间超过10%)。然后,整个工作或许未能在第三或第四迭代与像内存限制超过消息,失落执行人/无路径执行人,和纱线杀死了我的执行人。我认为通过缓存和检查点,我为执行者节省了大量空间。我只是不明白是否有某种内存泄漏? 为什么内存继续填满?
我在EMR运行星火2.1.1 m3.large实例。我的群集大小限制在〜1.6TB。我用下面的配置中运行:
driver-memory 8g
deploy-mode cluster
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=100
spark.dynamicAllocation.maxExecutors=200
spark.shuffle.service.enabled=true
executor-cores 4
executor-memory 8g
什么我的代码看起来有点像:
var accRdd = <empty>
val batchSize = 400
var iteration = 1
filesToIngest.grouped(batchSize).foreach {
val transformedRdd = transform(accRdd).reduceByKey((row1, row2) =>
combine(row1, row2)
)
val oldAccRdd = accRdd
accRdd = accRdd.union(transformedRdd).reduceByKey((row1, row2) =>
combine(row1, row2)
).coalesce(5 + i)
accRdd.persist(MEMORY_AND_DISK_SER)
accRdd.checkpoint()
oldAccRdd.unpersist(blocking = true) // I assume this will ensure all references to this cleared from memory
log_info(s"Total row count on iteration: ${accRdd.count()}")
iteration += 1
}
我已经按照此建议:https://github.com/deeplearning4j/nd4j/issues/1251,并正尝试以避免调整其它配置变量相关以gc,记忆分数和jvm。再次,我正在寻找对可能发生的事情的解释,以及我对缓存/检查点的假设可能是错误的。 谢谢!
你可能想看看一些从我们的记忆页的建议: https://deeplearnin4j.org/memory
和https://deeplearning4j.org/spark
一般来说,deeplearning4j都有自己的堆外的内存。你应该使用更多的执行器来设置较小的批处理大小,注意javacpp关闭堆配置,并将火花内存设置为允许的范围。