2016-12-05 75 views
1

我有一個關於火花的非常基本的問題。我通常使用50個核心運行火花作業。在查看工作進度的同時,大多數時候它顯示50個並行運行的進程(按照它應該這樣做),但有時它只顯示2個或4個並行運行的進程。就像這樣:火花核心與任務併發

[Stage 8:================================>      (297 + 2)/500] 

的RDD的正在處理的是repartitioned 100多個分區。所以這不應該是一個問題。

雖然我有一個意見。我已經看到了大多數情況發生的模式,SparkUI中的數據局部性顯示爲NODE_LOCAL,而其他時候所有50個進程都在運行,其中一些進程顯示爲RACK_LOCAL。 這讓我懷疑,也許會發生這種情況,因爲在同一節點中處理數據之前會緩存數據以避免網絡開銷,並且這會降低進一步的處理速度。

如果是這樣的話,什麼是避免它的方式。如果情況並非如此,那麼這裏發生了什麼?

回答

1

一個星期後,以上這個問題掙扎,我想我已經找到是什麼原因導致的問題。

如果您正在使用同樣的問題掙扎,好點開始將檢查星火實例配置的罰款。關於它有一個偉大的cloudera blog post

但是,如果問題不在於配置(這一點與我的情況),那麼問題是某處你的代碼中。問題在於,有時由於不同的原因(偏斜的連接,數據源中不均勻的分區等),您正在使用的RDD在2-3個分區上獲取大量數據,其餘分區的數據很少。

爲了降低整個網絡的數據混洗,火花嘗試,每個執行器處理該數據在該節點上本地駐留。因此,2-3位執行者正在工作很長一段時間,而其他執行者只需在幾毫秒內完成數據。這就是爲什麼我遇到了我在上述問題中描述的問題。

調試這個問題的辦法是首先的檢查RDD的分區大小。如果一個或幾個分區與其他分區相比非常大,那麼下一步就是在大分區中查找記錄,這樣您就可以知道,特別是在發生偏斜連接的情況下,哪個鍵會發生偏斜。我已經寫了一個小功能調試此:

from itertools import islice 
def check_skewness(df): 
    sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing 
    l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect() 
    max_part = max(l,key=lambda item:item[1]) 
    min_part = min(l,key=lambda item:item[1]) 
    if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times 
     print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n' 
     print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5)  if i == max_part[0] else []).take(5)) 
    else: 
     print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part 

它給我的最小和最大分區的大小,如果這兩者之間的差值超過5倍,它打印的最大分區的5個元素,應該給你一個大概的想法。

一旦你發現問題是偏斜的分區,你可以找到一種方法來擺脫偏斜的關鍵,或者你可以重新分區你的數據框,這將迫使它得到平均分配,現在看到所有的執行者都會在相同的時間內工作,並且你會看到更少的可怕OOM錯誤,並且處理速度也會非常快。

這些僅僅是我作爲Spark新手的兩分錢,我希望Spark專家可以在這個問題上增加一些內容,因爲我認爲Spark世界中的很多新手常常面臨類似的問題。