数据框大小保持增长,尽管其数量并没有增长

问题描述:

我需要一些帮助
当我使用的循环更新数据帧我有Apache的火花的问题。其大小保持,尽管其数量并没有增长数据框大小保持增长,尽管其数量并没有增长

ü可以建议我如何解决它或引导我,为什么我的数据框大小一直都在不断增长的时间无限? (T^T)上的本地[6]使用spark2.0.1

@this //
我的程序运行是我的代码

def main(args: Array[String]): Unit = { 
    val df1 = initial dataframe(read from db) 
    while(){ 
     val word_count_df = processAndCountText() // query data from database and do word count 
     val temp_df1 = update(df1,word_count_df) 
     temp_df1.persist(StorageLevel.MEMORY_AND_DISK) 
     df1.unpersist() 
     df1 = temp_df1 

     println(temp_df1.count()) 
     println(s"${SizeEstimator.estimate(temp_df1)/1073741824.0} GB") 
    } 
} 

//被修改
,更新一些行这是更新功能在word_count_df中有关键字。
我试图将其分割为2个dataframes并分别计算它,然后返回2个dataframes工会,但它需要太多的时间,因为它需要启用“spark.sql.crossJoin.enabled”

def update(u_stateful_df : DataFrame, word_count_df : DataFrame) : DataFrame = { 
    val run_time = current_end_time_m - start_time_ms/60000 
    val calPenalty = udf { (last_update_duration: Long, run_time: Long) => calculatePenalty(last_update_duration, run_time) } 
    //calculatePenalty is simple math function using for loop and return double 
    val calVold = udf { (v_old: Double, penalty_power: Double) => v_old * Math.exp(penalty_power) } 


    //(word_new,count_new) 
    val word_count_temp_df = word_count_df 
      .withColumnRenamed("word", "word_new") 
      .withColumnRenamed("count", "count_new") 

    //u_stateful_df (word,u,v,a,last_update,count) 
    val state_df = u_stateful_df 
      .join(word_count_temp_df, u_stateful_df("word") === word_count_temp_df("word_new"), "outer") 
      .na.fill(Map("last_update" -> start_time_ms/60000)) 
      .na.fill(0.0) 
      .withColumn("word", when(col("word").isNotNull, col("word")).otherwise(col("word_new"))) 
      .withColumn("count", when(col("word_new").isNotNull, col("count_new")).otherwise(-1)) 
      .drop("count_new") 
      .withColumn("current_end_time_m", lit(current_end_time_m)) 
      .withColumn("last_update_duration", col("current_end_time_m") - col("last_update")) 
      .filter(col("last_update_duration") < ResourceUtility.one_hour_duration_ms/60000) 
      .withColumn("run_time", when(col("word_new").isNotNull, lit(run_time))) 
      .withColumn("penalty_power", when(col("word_new").isNotNull, calPenalty(col("last_update_duration"), col("run_time")))) 
      .withColumn("v_old_penalty", when(col("word_new").isNotNull, calVold(col("v"), col("penalty_power")))) 
      .withColumn("v_new", when(col("word_new").isNotNull, col("count")/run_time)) 
      .withColumn("v_sum", when(col("word_new").isNotNull, col("v_old_penalty") + col("v_new"))) 
      .withColumn("a", when(col("word_new").isNotNull, (col("v_sum") - col("v"))/col("last_update_duration")).otherwise(col("a"))) 
      .withColumn("last_update", when(col("word_new").isNotNull, lit(current_end_time_m)).otherwise(col("last_update"))) 
      .withColumn("u", when(col("word_new").isNotNull, col("v")).otherwise(col("u"))) 
      .withColumn("v", when(col("word_new").isNotNull, col("v_sum")).otherwise(col("v"))) 

    state_df.select("word", "u", "v", "a", "last_update", "count") 
} 

@this是我的日志

u_stateful_df : 1408665 
size of dataframe size : 0.8601360470056534 GB 

u_stateful_df : 1408665 
size of dataframe size : 1.3347024470567703 GB 

u_stateful_df : 268498 
size of dataframe size : 1.5012029185891151 GB 

u_stateful_df : 147232 
size of dataframe size : 3.287795402109623 GB 

u_stateful_df : 111950 
size of dataframe size : 4.761911824345589 GB 

.... 
.... 

u_stateful_df : 72067 
size of dataframe size : 14.510709017515182 GB 

@this是日志当我写到文件

I save df1 as CSV in the file system. below is the size of dataframe in file system, count and size(track by org.apache.spark.util.SizeEstimator).  



csv size 84.2 MB  
u_stateful_df : 1408665  
size of dataframe size : 0.4460855945944786 GB  



csv size 15.2 MB  
u_stateful_df : 183315  
size of dataframe size : 0.522 GB  



csv size 9.96 MB  
u_stateful_df : 123381  
size of dataframe size : 0.630GB  



csv size 4.63 MB  
u_stateful_df : 56896  
size of dataframe size : 0.999 GB 

... 
... 
... 

csv size 3.47 MB 
u_stateful_df : 43104 
size of dataframe size : 3.1956922858953476 GB 
+0

我想解决的办法是内部'update'功能,你可以发布它的代码? – Mariusz

+0

到Mariusz,thx为您的关注。我添加更新函数来发布你的请求>

+0

谢谢。我找不到'update'里面的bug,我期望找到让行更大或者内存泄漏的东西......请尝试另一个实验 - 而不是使用'SizeEstimator',只是将这些数据以行格式写入文件系统(csv/json)并检查其大小是否增长。 – Mariusz

它看起来像内部引发一些泄漏。通常当你在数据帧调用persistcache然后count星火生成结果,并将其存储内的分布式内存或磁盘上,但也知道整个执行计划,以重建数据帧丢失遗嘱执行人或某事的情况。但它不应该采取这么大的空间...

据我所知,没有选项“崩溃”数据框(告诉星火忘记整个执行计划)等,简单地写入到存储,然后从该读存储。

+0

谢谢先生。 (^ 0 ^)// 有没有解决方法来解决它? 我需要在使用spark的while循环中运行此算法来验证sparkstreaming的结果(我也代码spark流和它共享相同的API) –

+0

将数据写入临时目录并在循环中读取它们(而不是保留在缓存中),我想这是唯一的方法... – Mariusz