2015-06-02 39 views
0

作爲Spark的新手,我一直在尋找他們的python example for estimation of PI如何通過Spark控制RDD的隱式緩存?

我有興趣瞭解Spark在相同環境下多次重新估計PI的性能。

我觀察到的是,在這些重新估計中,PI的值保持不變,並且性能計時似乎表明中間RDD被隱式高速緩存,然後在隨後的計算中重新使用。

有什麼辦法可以配置Spark來控制這種行爲,並且中間的RDD總是被重新生成?使用unpersist()似乎沒有效果。

我的代碼產生這個問題here在github,通過調用

`spark-submit pi2.py` 

執行得到以下結果:

No caching-0: 8000 generated 6256 in 1.14984297752 secs (PI = 3.128) 
No caching-1: 8000 generated 6256 in 0.0597329139709 secs (PI = 3.128) 
No caching-2: 8000 generated 6256 in 0.0577840805054 secs (PI = 3.128) 
No caching-3: 8000 generated 6256 in 0.0545349121094 secs (PI = 3.128) 
No caching-4: 8000 generated 6256 in 0.0544559955597 secs (PI = 3.128) 
With caching-0: 8000 generated 6256 in 0.069139957428 secs (PI = 3.128) 
With caching-1: 8000 generated 6256 in 0.0549170970917 secs (PI = 3.128) 
With caching-2: 8000 generated 6256 in 0.0531771183014 secs (PI = 3.128) 
With caching-3: 8000 generated 6256 in 0.0502359867096 secs (PI = 3.128) 
With caching-4: 8000 generated 6256 in 0.0557379722595 secs (PI = 3.128)` 
+0

我有一種感覺,你從隨機獲得同樣的價值,因爲你的種子請求 –

+1

我沒有看到在你的代碼中顯式調用'cache'。 –

+0

無論是否調用seed(),我都會得到相同的結果。然而,我把它包括在每個測試中強制產生隨機數的新種子 - 按照python文檔(9.6),它應該使用系統時間作爲種子值。 – Geoff

回答

1

有幾件事情發生在這裏。首先,你實際上並沒有緩存RDD。從你的Github鏈接:

# Now persist the intermediate result 
sc.parallelize(xrange(1, n + 1), partitions).map(f).persist() 

這會創建一個新的RDD,執行一個映射,然後持續生成RDD。你沒有提及它,所以它現在已經消失了。

接下來,第一次運行可能會比較慢,因爲Spark會將您的功能廣播給您的工作人員。所以在工作中有一些緩存,但不是用於數據,而是用於您的代碼。

最後,隨機性:seed()在您的驅動程序中播種您的RNG。種子值在首次運行時與f()一起廣播給所有工作人員(因爲種子在random()內引用)。當你現在再次調用seed()時,它會更改驅動程序中的種子,但不會更改已經發送給工作人員的函數版本,因此您會一次又一次得到相同的結果。

+1

我已更新我的代碼以反映您的意見:https://github.com/gford1000/spark-pi/blob/master/pi3.py現在按預期工作。鑑於狀態正在被轉移,random.seed()並不合適,所以我成功切換到random.SystemRandom以在每個worker上生成不同的隨機值。 – Geoff