2015-07-10 84 views
1

我有一個是這樣的火花驅動程序:星火緩存策略

編輯 - 代碼的早期版本不同&沒有工作

var totalResult = ... // RDD[(key, value)] 
var stageResult = totalResult 

do { 
    stageResult = stageResult.flatMap(
    // Some code that returns zero or more outputs per input, 
    // and updates `acc` to number of outputs 
    ... 
).reduceByKey((x, y) => x.sum(y)) 

    totalResult = totalResult.union(stageResult) 
} while(stageResult.count() > 0) 

我從我的數據的性質認識這將最終終止(我本質上聚合了一個DAG中的節點)。

我不確定合理的緩存策略 - 我應該每次緩存stageResult循環嗎?我是否設置了一個可怕的遞歸塔,因爲每個totalResult都取決於它自己以前的所有化身?或者Spark會爲我指出這一點?或者我應該將每個RDD結果放在一個數組中,並在最後採取一個大的聯合?

建議在這裏受歡迎,謝謝。

+0

首先,蓄電池,當談到轉型是不可靠的。 –

+0

爲什麼不呢?我只是創建一個累加器並在地圖中使用它,然後在驅動程序中檢查它。 –

+0

在這種情況下理論上它可能在這裏工作,但由於機器故障和可能的重新計數,計數本身並不可靠。但是,既然你只關心它是否爲零,那麼它可能會適合你。另一個說明,但。如果這是你的最終代碼,那麼這將不會發生任何事情。有沒有行動正在運行,所以計算(因此累加器將永遠不會觸發) –

回答

2

我想改寫這個如下:

do { 
    stageResult = stageResult.flatMap(
    //Some code that returns zero or more outputs per input 
).reduceByKey(_+_).cache 

    totalResult = totalResult.union(stageResult) 
} while(stageResult.count > 0) 

我相當肯定(95%),其DAG中union使用的stageResult將是正確的引用(尤其是數應觸發它),但是這可能需要重新檢查。

然後當您撥打totalResult.ACTION時,它會將所有緩存的數據放在一起。

ANSWER基於新的問題

只要你擁有的內存空間,然後我的確會cache一切前進的道路上,因爲它存儲每個stageResultunion荷蘭國際集團所有的數據點在數據結束。事實上,每個工會都不依賴於過去,因爲這不是RDD.union的語義,它只是把它們放在一起。由於RDD不變性,您可以輕鬆更改代碼以使用val

最後一點,也許DAG可視化將有助於理解爲什麼就不會有遞歸的後果:

DAG

+0

但我不想調用count,因爲這會觸發RDD的額外迭代,對吧? –

+0

你需要調用'count'或者'while'中的代碼永遠不會做任何事情。而且,如果你正在緩存數據,那麼額外的迭代應該大大減少。 –

+0

我知道我需要打電話給我。我的問題是關於緩存如何工作的細節。 –