我遇到了一個奇怪的問題,在一個數據集上調用unpersist()
會影響同一代碼塊中另一個數據集的計數。不幸的是,這發生在一個複雜的長時間運行的工作中,有很多數據集,所以我不能在這裏總結整個事情。我知道這是一個棘手的問題,但讓我試着勾畫出來。我正在尋找的是一些確認,這種行爲是意想不到的,以及關於它爲什麼會發生或者我們如何避免它的任何想法。Dataset.unpersist()意外地影響了其他RDD的計數
編輯:此問題報告發生在Spark 2.1.1上,但不會在2.1.0上發生。問題是100%可重複的,但只有在我的項目中有1000行代碼和數據,我正努力嘗試將其提煉成一個簡明的示例,但尚未能夠實現,我將發佈任何更新或重新提交我的問題,如果我找到了什麼。事實上,完全相同的代碼和數據在2.1.0而不是2.1.1中起作用,這使我相信這是由於Spark內的某些原因造成的。
val claims:Dataset = // read claims from file
val accounts:Dataset = // read accounts from file
val providers:Dataset = // read providers from file
val payers:Dataset = // read payers from file
val claimsWithAccount:Dataset = // join claims and accounts
val claimsWithProvider:Dataset = // join claims and providers
val claimsWithPayer:Dataset = // join claimsWithProvider and payers
claimsWithPayer.persist(StorageLevel.MEMORY_AND_DISK)
log.info("claimsWithPayer = " + claimsWithPayer.count()) // 46
// This is considered unnecessary intermediate data and can leave the cache
claimsWithAccount.unpersist()
log.info("claimsWithPayer = " + claimsWithPayer.count()) // 41
本質上,在一系列連接主叫對中間數據集之一unpersist()
影響在後面的數據組中的一個的行數,如通過Dataset.count()
報道。
我的理解是,unpersist()
應該從緩存中刪除數據,但不應該影響其他數據集的數量或內容?這是特別令人驚訝的,因爲我在未執行其他數據之前明確堅持claimsWithPayer
。
「大致如此」並不夠好。請嘗試使用[mcve] - 它或者是__嚴重的正確性錯誤___([然後應該被報告])(https://issues.apache.org/jira/projects/SPARK/summary),並且有足夠的信息來確定問題的來源)__或者這是你的錯誤(例如假設Spark沒有提供任何保證的確定性行爲)並且朝着最小的例子努力應該給出一些提示。另外請確保您使用的是最新的次要版本。 – zero323
像許多其他Spark問題一樣,它只發生在某些數據和情況下。我花了很多時間把它簡化爲一個簡明的例子,但還沒有成功。我的問題是關於Spark的緩存語義,堅持和unpersist類似於這個問題https://stackoverflow.com/questions/29903675/understanding-sparks-caching –
所以就像我說:如果'claimsWithPayer'的每個祖先是確定性的(足夠),那麼持久性應該完全沒有影響,這是一個錯誤。否則它可能會誤解語義。這是由你來弄清楚是否是這種情況。如果您可以將問題簡化爲可管理的,我建議將其轉發給開發人員列表。如果在2.1.0和2.1.1之間發生了變化,則更有可能有人負責人將能夠識別該問題。 – zero323