2016-12-28 39 views
0

我們有一個小配置表(大約50000條記錄),每天更新一次。刷新緩存的數據幀?

我們有一個緩存的數據框用於這個表格,並且正在加入spark數據。在基本配置單元中加載新數據時,我們如何刷新數據框?

DataFrame tempApp = hiveContext.table("emp_data"); 

//Get Max Load-Date 
Date max_date = max_date = tempApp.select(max("load_date")).collect()[0].getDate(0); 

//Get data for latest date and cache. This will be used to join with stream data. 
DataFrame emp= hiveContext.table("emp_data").where("load_date='" + max_date + "'").cache(); 

// Get message from Kafka Stream 
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(....); 

JavaDStream<MobileFlowRecord> rddMobileFlorRecs = messages.map(Record::parseFromMessage); 

kafkaRecs.foreachRDD(rdd->{DataFrame recordDataFrame = hiveContext.createDataFrame(rdd, Record.class); 

DataFrame joinedDataSet = recordDataFrame.join(emp, 
recordDataFrame.col("application").equalTo(app.col("emp_id")); 
joinedDataSet. <Do furthur processing> 
}); 

回答

0

星火自動unpersist的RDD或數據幀,如果他們不再使用。爲了知道是否緩存了RDD或Dataframe,可以進入Spark UI - > Storage tabl並查看內存詳細信息。您可以使用df.unpersist()sqlContext.uncacheTable("sparktable")uncacheTable APi從內存中刪除df或表。此選項在新的SparksessionAPi中不可用,但向後兼容始終存在。除非且直到您說出任何操作,否則Spark不會將任何數據加載或處理到RDD或DataFrame中。

因此對於您在執行join後,爲您的Dataframe執行unpersist()。這將提高性能並解決您的問題。

Databricks

+0

我懷疑我是否理解你的解決方案。緩存和取消緩存數據集可能會解決問題,但會破壞緩存的目的,因爲緩存僅適用於一次迭代。我已經添加了示例代碼以獲得更多的說明。其次,我測試了每次迭代的緩存和非緩存大約會延遲3秒。想知道是否有其他方法來實現這一目標? – Akhil

0

您可以手動完成。事情是這樣的:

DataFrame refresh(DataFrame orig) { 
    if (orig != null) { 
     orig.unpersist(); 
    } 
    DataFrame res = get the dataframe as you normally would 
    res.cache() 
    return res 

現在,一旦調用這個每天或當你想刷新這樣的:

DataFrame join_df = refresh(join_df) 

什麼這主要的作用是unpersists以前的版本(刪除緩存),讀新的,然後緩存它。所以在實踐中,數據幀被刷新。

您應該注意,數據幀只會在刷新後第一次在緩存爲惰性時使用時纔會保留在內存中。