2016-11-15 50 views
1

我需要一些幫助
當我使用的循環更新數據幀我有Apache的火花的問題。其大小保持,儘管其數量並沒有增長數據框大小保持增長,儘管其數量並沒有增長

ü可以建議我如何解決它或引導我,爲什麼我的數據框大小一直都在不斷增長的時間無限? (T^T)上的本地[6]使用spark2.0.1

@this //
我的程序運行是我的代碼

def main(args: Array[String]): Unit = { 
    val df1 = initial dataframe(read from db) 
    while(){ 
     val word_count_df = processAndCountText() // query data from database and do word count 
     val temp_df1 = update(df1,word_count_df) 
     temp_df1.persist(StorageLevel.MEMORY_AND_DISK) 
     df1.unpersist() 
     df1 = temp_df1 

     println(temp_df1.count()) 
     println(s"${SizeEstimator.estimate(temp_df1)/1073741824.0} GB") 
    } 
} 

//被修改
,更新一些行這是更新功能在word_count_df中有關鍵字。
我試圖將其分割爲2個dataframes並分別計算它,然後返回2個dataframes工會,但它需要太多的時間,因爲它需要啓用「spark.sql.crossJoin.enabled」

def update(u_stateful_df : DataFrame, word_count_df : DataFrame) : DataFrame = { 
    val run_time = current_end_time_m - start_time_ms/60000 
    val calPenalty = udf { (last_update_duration: Long, run_time: Long) => calculatePenalty(last_update_duration, run_time) } 
    //calculatePenalty is simple math function using for loop and return double 
    val calVold = udf { (v_old: Double, penalty_power: Double) => v_old * Math.exp(penalty_power) } 


    //(word_new,count_new) 
    val word_count_temp_df = word_count_df 
      .withColumnRenamed("word", "word_new") 
      .withColumnRenamed("count", "count_new") 

    //u_stateful_df (word,u,v,a,last_update,count) 
    val state_df = u_stateful_df 
      .join(word_count_temp_df, u_stateful_df("word") === word_count_temp_df("word_new"), "outer") 
      .na.fill(Map("last_update" -> start_time_ms/60000)) 
      .na.fill(0.0) 
      .withColumn("word", when(col("word").isNotNull, col("word")).otherwise(col("word_new"))) 
      .withColumn("count", when(col("word_new").isNotNull, col("count_new")).otherwise(-1)) 
      .drop("count_new") 
      .withColumn("current_end_time_m", lit(current_end_time_m)) 
      .withColumn("last_update_duration", col("current_end_time_m") - col("last_update")) 
      .filter(col("last_update_duration") < ResourceUtility.one_hour_duration_ms/60000) 
      .withColumn("run_time", when(col("word_new").isNotNull, lit(run_time))) 
      .withColumn("penalty_power", when(col("word_new").isNotNull, calPenalty(col("last_update_duration"), col("run_time")))) 
      .withColumn("v_old_penalty", when(col("word_new").isNotNull, calVold(col("v"), col("penalty_power")))) 
      .withColumn("v_new", when(col("word_new").isNotNull, col("count")/run_time)) 
      .withColumn("v_sum", when(col("word_new").isNotNull, col("v_old_penalty") + col("v_new"))) 
      .withColumn("a", when(col("word_new").isNotNull, (col("v_sum") - col("v"))/col("last_update_duration")).otherwise(col("a"))) 
      .withColumn("last_update", when(col("word_new").isNotNull, lit(current_end_time_m)).otherwise(col("last_update"))) 
      .withColumn("u", when(col("word_new").isNotNull, col("v")).otherwise(col("u"))) 
      .withColumn("v", when(col("word_new").isNotNull, col("v_sum")).otherwise(col("v"))) 

    state_df.select("word", "u", "v", "a", "last_update", "count") 
} 

@this是我的日誌

u_stateful_df : 1408665 
size of dataframe size : 0.8601360470056534 GB 

u_stateful_df : 1408665 
size of dataframe size : 1.3347024470567703 GB 

u_stateful_df : 268498 
size of dataframe size : 1.5012029185891151 GB 

u_stateful_df : 147232 
size of dataframe size : 3.287795402109623 GB 

u_stateful_df : 111950 
size of dataframe size : 4.761911824345589 GB 

.... 
.... 

u_stateful_df : 72067 
size of dataframe size : 14.510709017515182 GB 

@this是日誌當我寫到文件

I save df1 as CSV in the file system. below is the size of dataframe in file system, count and size(track by org.apache.spark.util.SizeEstimator).  



csv size 84.2 MB  
u_stateful_df : 1408665  
size of dataframe size : 0.4460855945944786 GB  



csv size 15.2 MB  
u_stateful_df : 183315  
size of dataframe size : 0.522 GB  



csv size 9.96 MB  
u_stateful_df : 123381  
size of dataframe size : 0.630GB  



csv size 4.63 MB  
u_stateful_df : 56896  
size of dataframe size : 0.999 GB 

... 
... 
... 

csv size 3.47 MB 
u_stateful_df : 43104 
size of dataframe size : 3.1956922858953476 GB 
+0

我想解決的辦法是內部'update'功能,你可以發佈它的代碼? – Mariusz

+0

到Mariusz,thx爲您的關注。我添加更新函數來發布你的請求><。 –

+0

謝謝。我找不到'update'裏面的bug,我期望找到讓行更大或者內存泄漏的東西......請嘗試另一個實驗 - 而不是使用'SizeEstimator',只是將這些數據以行格式寫入文件系統(csv/json)並檢查其大小是否增長。 – Mariusz

回答

0

它看起來像內部引發一些泄漏。通常當你在數據幀調用persistcache然後count星火生成結果,並將其存儲內的分佈式內存或磁盤上,但也知道整個執行計劃,以重建數據幀丟失遺囑執行人或某事的情況。但它不應該採取這麼大的空間...

據我所知,沒有選項「崩潰」數據框(告訴星火忘記整個執行計劃)等,簡單地寫入到存儲,然後從該讀存儲。

+0

謝謝先生。 (^ 0 ^)// 有沒有解決方法來解決它? 我需要在使用spark的while循環中運行此算法來驗證sparkstreaming的結果(我也代碼spark流和它共享相同的API) –

+0

將數據寫入臨時目錄並在循環中讀取它們(而不是保留在緩存中),我想這是唯一的方法... – Mariusz