2015-05-23 17 views
3

我加入了兩個RDD rddArddB鋤頭Spark是否計劃加入?

rddA有100個分區,rddB有500個分區。

我想了解join操作的機制。默認情況下,無論連接的順序如何,我都會得到相同的分區結構;即rddA.join(rddB)和rddB.join(rddA)產生相同數量的分區,並且通過觀察它使用較小的分區大小,100.我知道我可以通過使用rddA.join(rddB,500)來增加分區大小,但是我更關心在引擎蓋下發生了什麼以及爲什麼選擇較小的尺寸。從觀察結果來看,即使我重新劃分小型rdd,它的分區仍然會被使用; Spark是否對密鑰大小做過任何啓發式分析?

我的另一個問題是我得到的偏斜程度。我的小分區最終以3,314個條目結束,而最大分區總數達到599,911,729個(鍵),總數爲1,139,207個。兩個RDD都使用默認分區程序,那麼數據混洗是如何決定的? 我依稀記得,如果一個rdd有一個分區程序集,那麼它將被使用的分區程序。是這樣嗎?這是「推薦」嗎?

最後,請注意,我的兩個rdd s都比較大(〜90GB),因此廣播連接無濟於事。相反,任何對join操作提供一些見解的方式都可能是要走的路。

PS。關於機制左右連接的任何細節都將是額外的好處:)

回答

5

雖然我還沒有設法解釋如何派生分區,但我確實發現了數據如何洗牌(這是我最初的問題)。一個連接有一些副作用:

洗牌/分區: 星火將散列分區「RDD」鍵,移動/「工人」之間分配。給定鍵(例如5)的每組值都將以單個'工作者'/ JVM結束。這意味着如果你的'join'有一個1..N的關係,並且N嚴重傾斜,你將會得到偏斜的分區和JVM堆(即一個'Partition'可能有Max(N),另一個Min(N) )。避免這種情況的唯一方法是儘可能使用'廣播'或忍受這種行爲。由於您的數據最初將平均分配,混洗的數量將取決於密鑰散列。

重新分區: 繼「傾斜」的加入,呼籲「重新劃分」似乎分區之間均勻地重新分配數據。所以如果你有不可避免的偏斜問題,這是一件好事。請注意,這種轉換會引發沉重的洗牌,但下面的操作會更快。這樣做的缺點,雖然是不可控制的對象創建(見下文)

對象創建/堆污染: 你成功加入您的數據認爲,重新劃分將是一個好主意,重新平衡羣集,但對於一些理由,'重新分配'觸發'OOME'。會發生什麼是最初連接的數據重新使用連接的對象。當你觸發「重新分配」或任何其他涉及洗牌的「行動」時,一個額外的連接或'groupBy'(後跟一個'Action'),數據被序列化,所以你失去了對象的重用。一旦對象被反序列化,它們就成爲新的實例。另外請注意,在序列化過程中,重複使用會丟失,因此壓力會很大。所以,就我而言,1 ..1000000加入(其中1是我'沉重'的對象),將在任何觸發洗牌的操作之後失敗。

變通/調試:

  1. 我用「mapPartitionsWithIndex」調試分區大小通過返回單個項目「可迭代>」與每個分區的計數。這是非常有用的,因爲您可以在「操作」之後看到「重新分區」的效果和分區的狀態。
  2. 您可以在連接RDD上使用'countByKeyApprox'或'countByKey'來獲得基數的感覺,然後分兩步應用連接。爲您的高基數密鑰使用「廣播」,爲低基數密鑰使用「加入」。將這些操作包裝在'rdd.cache()'&'rdd.unpersist()'塊中將顯着加速此過程。雖然這可能會使代碼變得複雜一些,但它會提供更好的性能,尤其是如果您執行後續操作時。另外請注意,如果您在每個「地圖」中使用「廣播」,要進行查找,您還將顯着減少混排大小。
  3. 調用影響分區數量的其他操作的「重新分區」可能非常有用,但請注意,隨機分配的大量數據將導致更多的異常,因爲給定密鑰的大集合將創建大分區,但其他分區的大小將爲0.創建調試方法以獲取分區大小將有助於您選擇合適的大小。