我想了解Spark如何分區數據。假設我有一個像圖中那樣的執行DAG(橙色框是舞臺)。如果RDD沒有被分區,則兩個groupBy
和join
操作應該是非常沉重的。瞭解Spark分區
那麼明智的做法是使用.partitonBy(new HashPartitioner(properValue))
來P1,P2,P3和P4避免洗牌?分區現有RDD的成本是多少?何時不適合劃分現有的RDD?如果我沒有指定分區程序,Spark不會自動分區我的數據嗎?
謝謝
我想了解Spark如何分區數據。假設我有一個像圖中那樣的執行DAG(橙色框是舞臺)。如果RDD沒有被分區,則兩個groupBy
和join
操作應該是非常沉重的。瞭解Spark分區
那麼明智的做法是使用.partitonBy(new HashPartitioner(properValue))
來P1,P2,P3和P4避免洗牌?分區現有RDD的成本是多少?何時不適合劃分現有的RDD?如果我沒有指定分區程序,Spark不會自動分區我的數據嗎?
謝謝
tl; dr分別回答您的問題:如果可以的話,最好先進行分區;可能比不分區;無論如何,你的RDD
都是以這樣或那樣的方式分區的;是。
這是一個相當廣泛的問題。它佔據了我們課程的很大一部分!但是,我們儘量在不寫小說的情況下儘可能多地處理分區問題。
如您所知,使用類似Spark這樣的工具的主要原因是因爲您有太多的數據需要在一臺機器上進行分析,而沒有將風扇看作噴氣發動機。數據分佈在集羣中所有機器上的所有核心中,所以是的,根據數據,存在默認分區。請記住,數據已經靜態分發(在HDFS,HBase等中),因此Spark默認根據默認策略進行分區,以將數據保留在已存在的機器上 - 默認分區數量相同到羣集上的核心數量。您可以通過配置spark.default.parallelism
來覆蓋此默認編號,並且您希望此編號爲每臺機器每個核心2-3個。
但是,爲了您的分析和目的,通常希望屬於一組的數據(例如,具有相同密鑰的數據,其中HashPartitioner
將應用)位於同一分區中,而不管它們從何處開始。以儘量減少後來的洗牌。 Spark還提供RangePartitioner
,或者您可以很容易地根據自己的需求推出自己的產品。但是你是對的,從默認分區到自定義分區有一個前期洗牌成本;它幾乎總是值得的。
一開始進行分區通常是明智的(而不是延遲不可避免的與partitionBy
),然後repartition
(如果稍後需要的話)。之後,您可能會選擇3210甚至導致中間洗牌,以減少分區數量,並可能會使某些機器和內核閒置,因爲網絡IO的增益(在該前期成本之後)大於CPU功率損失。
(唯一的情況是,我可以想到,從一開始就不分區的地方 - 因爲你不能 - 當你的數據源是一個壓縮文件時。)
還要注意,您可以在地圖轉換期間使用mapPartitions
和mapPartitionsWithIndex
保留分區。
最後,請記住,當你與你的分析實驗,而你的工作你的方式到規模,有診斷能力,你可以使用:
toDebugString
看到的血統RDD
小號getNumPartitions
來,令人震驚,獲得分區glom
的數清楚地看到你的數據被劃分如果您原諒無恥的插頭,這些是我們在Analytics with Apache Spark討論的事情。我們希望很快就有在線版本。
通過應用partitionBy
先發制人你不避洗牌。你只是把它推到另一個地方。如果分區RDD重複使用多次,這可能是一個好主意,但是對於一次性連接,您沒有任何收穫。
如果我沒有指定分區器,不會自動分區我的數據嗎?
它將分區(又名洗牌)您的數據連接)和隨後groupBy
的一部分(除非你保持相同的密鑰,並用它保留分區轉換)。