2017-06-21 40 views
1

我有以下結構化查詢:將持續數據幀重複計算多次?

val A = 'load somedata from HDFS'.persist(StorageLevel.MEMORY_AND_DISK_SER) 
val B = A.filter('condition 1') 
val C = A.filter('condition 2') 
val D = A.filter('condition 3') 
val E = A.filter('condition 4') 
val F = A.filter('condition 5') 
val G = A.filter('condition 6') 
val H = A.filter('condition 7') 

val I = B.union(C).union(D).union(E).union(F).union(G).union(H) 

我堅持數據幀A,使得當我使用B/C/d/E/F/G/H時,A數據幀應當只計算一旦?但是,這項工作的DAG低於:

enter image description here

從上面的DAG,似乎階段6-12的所有執行和數據幀A被計算的7倍?

爲什麼會發生這種情況?

也許DAG只是假的?我發現階段7-12的頂部沒有行,其中階段6確實有兩行來自其他階段

我沒有列出所有的操作。在union操作之後,我將I數據幀保存到HDFS。對I數據幀執行此操作會使持久操作真的完成嗎?或者我必須在A數據幀上執行動作操作(例如count)以在重新使用A數據幀之前觸發持久操作?

回答

2

做下面的行不會持續你的數據集。與數據集API一起使用時,所以你必須使用觸發操作count或類似的,反過來提交星火工作緩存

val A = 'load somedata from HDFS'.persist(StorageLevel.MEMORY_AND_DISK_SER) 

緩存/持久性是懶惰。

以下所有運營商後,filter包括,應使用InMemoryTableScan在計劃中的綠點(如下圖所示)。

enter image description here

在你的情況,甚至union後的數據集I沒有被緩存,因爲你還沒有觸發緩存(而僅僅是將其標記爲緩存)。

聯合操作後,我將I數據幀保存到HDFS。對I數據幀執行此操作會使持久操作真的完成嗎?

是的。只有操作(如保存到外部存儲)才能觸發持久性以供將來重用。

或者我必須做一個動作操作,如計數在A數據幀觸發持久化操作,然後再使用A數據幀?

這就是重點!在你的情況下,因爲你想在filter運營商中重複使用A數據幀,你應該首先persistcount(觸發緩存),然後是filter

在你的情況下,沒有filter將受益於persist的性能增加。persist實際上對性能沒有任何影響,只是讓代碼審查人員認爲它不是。

如果您想查看您的數據集何時被緩存,您可以在網絡用戶界面中檢出存儲選項卡或詢問CacheManager

val nums = spark.range(5).cache 
nums.count 
scala> spark.sharedState.cacheManager.lookupCachedData(nums) 
res0: Option[org.apache.spark.sql.execution.CachedData] = 
Some(CachedData(Range (0, 5, step=1, splits=Some(8)) 
,InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
    +- *Range (0, 5, step=1, splits=8) 
)) 
+0

數據框是否像維護數據庫一樣維護數據行? – BDR