数据框大小保持增长,尽管其数量并没有增长
我需要一些帮助
当我使用的循环更新数据帧我有Apache的火花的问题。其大小保持,尽管其数量并没有增长数据框大小保持增长,尽管其数量并没有增长
ü可以建议我如何解决它或引导我,为什么我的数据框大小一直都在不断增长的时间无限? (T^T)上的本地[6]使用spark2.0.1
@this //
我的程序运行是我的代码
def main(args: Array[String]): Unit = {
val df1 = initial dataframe(read from db)
while(){
val word_count_df = processAndCountText() // query data from database and do word count
val temp_df1 = update(df1,word_count_df)
temp_df1.persist(StorageLevel.MEMORY_AND_DISK)
df1.unpersist()
df1 = temp_df1
println(temp_df1.count())
println(s"${SizeEstimator.estimate(temp_df1)/1073741824.0} GB")
}
}
//被修改
,更新一些行这是更新功能在word_count_df中有关键字。
我试图将其分割为2个dataframes并分别计算它,然后返回2个dataframes工会,但它需要太多的时间,因为它需要启用“spark.sql.crossJoin.enabled”
def update(u_stateful_df : DataFrame, word_count_df : DataFrame) : DataFrame = {
val run_time = current_end_time_m - start_time_ms/60000
val calPenalty = udf { (last_update_duration: Long, run_time: Long) => calculatePenalty(last_update_duration, run_time) }
//calculatePenalty is simple math function using for loop and return double
val calVold = udf { (v_old: Double, penalty_power: Double) => v_old * Math.exp(penalty_power) }
//(word_new,count_new)
val word_count_temp_df = word_count_df
.withColumnRenamed("word", "word_new")
.withColumnRenamed("count", "count_new")
//u_stateful_df (word,u,v,a,last_update,count)
val state_df = u_stateful_df
.join(word_count_temp_df, u_stateful_df("word") === word_count_temp_df("word_new"), "outer")
.na.fill(Map("last_update" -> start_time_ms/60000))
.na.fill(0.0)
.withColumn("word", when(col("word").isNotNull, col("word")).otherwise(col("word_new")))
.withColumn("count", when(col("word_new").isNotNull, col("count_new")).otherwise(-1))
.drop("count_new")
.withColumn("current_end_time_m", lit(current_end_time_m))
.withColumn("last_update_duration", col("current_end_time_m") - col("last_update"))
.filter(col("last_update_duration") < ResourceUtility.one_hour_duration_ms/60000)
.withColumn("run_time", when(col("word_new").isNotNull, lit(run_time)))
.withColumn("penalty_power", when(col("word_new").isNotNull, calPenalty(col("last_update_duration"), col("run_time"))))
.withColumn("v_old_penalty", when(col("word_new").isNotNull, calVold(col("v"), col("penalty_power"))))
.withColumn("v_new", when(col("word_new").isNotNull, col("count")/run_time))
.withColumn("v_sum", when(col("word_new").isNotNull, col("v_old_penalty") + col("v_new")))
.withColumn("a", when(col("word_new").isNotNull, (col("v_sum") - col("v"))/col("last_update_duration")).otherwise(col("a")))
.withColumn("last_update", when(col("word_new").isNotNull, lit(current_end_time_m)).otherwise(col("last_update")))
.withColumn("u", when(col("word_new").isNotNull, col("v")).otherwise(col("u")))
.withColumn("v", when(col("word_new").isNotNull, col("v_sum")).otherwise(col("v")))
state_df.select("word", "u", "v", "a", "last_update", "count")
}
@this是我的日志
u_stateful_df : 1408665
size of dataframe size : 0.8601360470056534 GB
u_stateful_df : 1408665
size of dataframe size : 1.3347024470567703 GB
u_stateful_df : 268498
size of dataframe size : 1.5012029185891151 GB
u_stateful_df : 147232
size of dataframe size : 3.287795402109623 GB
u_stateful_df : 111950
size of dataframe size : 4.761911824345589 GB
....
....
u_stateful_df : 72067
size of dataframe size : 14.510709017515182 GB
@this是日志当我写到文件
I save df1 as CSV in the file system. below is the size of dataframe in file system, count and size(track by org.apache.spark.util.SizeEstimator).
csv size 84.2 MB
u_stateful_df : 1408665
size of dataframe size : 0.4460855945944786 GB
csv size 15.2 MB
u_stateful_df : 183315
size of dataframe size : 0.522 GB
csv size 9.96 MB
u_stateful_df : 123381
size of dataframe size : 0.630GB
csv size 4.63 MB
u_stateful_df : 56896
size of dataframe size : 0.999 GB
...
...
...
csv size 3.47 MB
u_stateful_df : 43104
size of dataframe size : 3.1956922858953476 GB
它看起来像内部引发一些泄漏。通常当你在数据帧调用persist
或cache
然后count
星火生成结果,并将其存储内的分布式内存或磁盘上,但也知道整个执行计划,以重建数据帧丢失遗嘱执行人或某事的情况。但它不应该采取这么大的空间...
据我所知,没有选项“崩溃”数据框(告诉星火忘记整个执行计划)等,简单地写入到存储,然后从该读存储。
谢谢先生。 (^ 0 ^)// 有没有解决方法来解决它? 我需要在使用spark的while循环中运行此算法来验证sparkstreaming的结果(我也代码spark流和它共享相同的API) –
将数据写入临时目录并在循环中读取它们(而不是保留在缓存中),我想这是唯一的方法... – Mariusz
我想解决的办法是内部'update'功能,你可以发布它的代码? – Mariusz
到Mariusz,thx为您的关注。我添加更新函数来发布你的请求>
谢谢。我找不到'update'里面的bug,我期望找到让行更大或者内存泄漏的东西......请尝试另一个实验 - 而不是使用'SizeEstimator',只是将这些数据以行格式写入文件系统(csv/json)并检查其大小是否增长。 – Mariusz