2017-04-11 57 views
0

我是相對較新的火花,甚至可能是錯誤的,在完成建立場景問題之前,請隨時跳過閱讀並指出您發現的位置,我在概念上是錯誤的,謝謝!應該緩存轉換依賴關係圖上的共同祖先嗎?

設想一段的驅動程序代碼是這樣的:

val A = ... (some transformation) 
val B = A.filter(fun1) 
val C = A.filter(fun2) 
... 
B.someAction()... //do sth with B 
... 
C.someAction()... //do sth with C 

轉化RDDS B和C都依賴於A,其本身可能是一個複雜的變換。那麼A會被計算兩次?我認爲它會因爲火花不能做任何事情,inter -transformations,對吧? Spark在智能優化一次轉換執行時間爲,因爲其中的捆綁任務可以進行全面分析。例如,某些狀態變化可能發生在B.someAction之後但在C.someAction之前,這可能會影響A的值,因此需要重新計算。又例如這可能發生這樣的:

val arr = Array(...) 
val A = sc.parallelize(...).flatMap(e => arr.map(_ * e)) //now A depends on some local array 

... //B and C stays the same as above 

B.someAction() 
... 
arr(i) = arr(i) + 10 //local state modified 
... 
C.someAction() //should A be recomputed? YES 

這是很容易驗證,所以我做了一個快速的實驗,其結果支持我的推理。

但是,如果B和C只是獨立依賴於A,並且不存在像上面那樣的其他邏輯,那麼程序員或某種工具可以靜態分析代碼並說可以在A上添加一個緩存,這樣它就不會不必要地重新計算!但星星之火可以什麼都不做這件事,有時甚至難以被人以決定:

val A = ... (some transformation) 
var B = A.filter(fun1) 
var C: ??? = null 
var D: ??? = null 

if (cond) { 
    //now whether multiple dependencies exist is runtime determined 
    C = A.filter(fun2) 
    D = A.filter(fun3) 
} 

B.someAction()... //do sth with B 

if (cond) { 
    C.someAction()... //do sth with C 
    D.someAction()... //do sth with D 
} 

如果條件爲真,那麼這是很有誘惑力的高速緩存A,但你永遠不會知道,直到運行時。我知道這是一個人造的蹩腳的例子,但這些已經是簡化的模型,事情在實踐中可能會變得更加複雜,並且依賴關係可能會很長並且隱含並且跨模塊傳播,所以我的問題是處理這類問題的一般原則是什麼。應該何時緩存轉換依賴關係圖上的共同祖先(提供的內存不是問題)?

我想聽到的東西就像始終遵循功能的編程範式做火花或總是緩存他們,如果你能然而有,我可能不需要另一種情況:

val A = ... (some transformation) 
val B = A.filter(fun1) 
val C = A.filter(fun2) 
... 
B.join(C).someAction() 

再次B和C都依賴在A上,而不是分別調用兩個動作,他們被連接起來形成一個單一的轉換。這一次我相信火花足夠聰明,可以精確地計算一次。還沒有找到適當的方式來運行和檢查,但應該在Web UI DAG中顯而易見。更進一步,我認爲火花甚至可以將兩個過濾器操作減少爲A中的一個遍歷,以同時獲得B和C.這是真的?

回答

1

這裏有很多東西需要解壓。

轉換RDD B和C都依賴於A本身可能是一個複雜的轉換。那麼A會被計算兩次?我認爲這將是因爲火花不能做任何相互轉化的事情,對吧?

是的,它會被計算兩次,除非您調用A.cache()A.persist(),在這種情況下,它將只計算一次。

例如,它可能是某些國家發生變化B.someAction之後,但之前C.someAction可能影響,因此重新計算的數值是必需的

不,這是不正確的,A是不可變的,因此它的狀態不能改變。 BC也是不變的RDD,它們表示A的轉換。

sc.parallelize(...).flatMap(e => arr.map(_ * e)) //now A depends on some local array 

不,它不依賴於本地陣列上,它是包含(駕駛員)本地數組的元素的副本的不可變RDD。如果數組更改,A不會更改。要獲得該行爲,您必須var A = sc. parallelize(...),然後在本地數組更改A = sc.paralellize(...)時再次設置A.在這種情況下,A未被更新,而是被本地陣列的新RDD表示替代,因此A的任何緩存版本都是無效的。

您發佈的後續示例將從緩存A中受益。再次因爲RDD是不可變的。

+0

謝謝達倫。所以我從中得到的結論是,在潛在的重新計算的情況下緩存RDD總是有意義的。 RDD是不可變的,可以看作涉及(本地)數據的包,以及對其創建快照時其他RDD和指令的引用。但我仍然無法理解我在本地火花外殼中所做的實驗:scala> val arr = Array(1,2,3)
> val A = sc.parallelize(Array(1,2,3))。flatMap (a => arr.map(_ * a)) > A.collect res0 = Array(1,2,3,2,4,6,3,6,9) > arr(0)= 100 > A.collect res2 =數組(100,2,3,200,4,6,300,6,9) – user1206899

+1

緩存多次使用的RDD將阻止重新計算,這對緩存是否有意義它將取決於用例和可用資源。關於你的實驗,我認爲這是使用REPL的副作用。 – ImDarrenG

+0

對不起,這裏的評論不能包含換行符。這意味着第二個* collect *結果發生了變化,這意味着必須創建一些新的RDD。但我一直使用相同的「val A」,所以不知道它是如何發生的。 – user1206899