一個星期後,以上這個問題掙扎,我想我已經找到是什麼原因導致的問題。
如果您正在使用同樣的問題掙扎,好點開始將檢查星火實例配置的罰款。關於它有一個偉大的cloudera blog post。
但是,如果問題不在於配置(這一點與我的情況),那麼問題是某處你的代碼中。問題在於,有時由於不同的原因(偏斜的連接,數據源中不均勻的分區等),您正在使用的RDD在2-3個分區上獲取大量數據,其餘分區的數據很少。
爲了降低整個網絡的數據混洗,火花嘗試,每個執行器處理該數據在該節點上本地駐留。因此,2-3位執行者正在工作很長一段時間,而其他執行者只需在幾毫秒內完成數據。這就是爲什麼我遇到了我在上述問題中描述的問題。
調試這個問題的辦法是首先的檢查RDD的分區大小。如果一個或幾個分區與其他分區相比非常大,那麼下一步就是在大分區中查找記錄,這樣您就可以知道,特別是在發生偏斜連接的情況下,哪個鍵會發生偏斜。我已經寫了一個小功能調試此:
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
它給我的最小和最大分區的大小,如果這兩者之間的差值超過5倍,它打印的最大分區的5個元素,應該給你一個大概的想法。
一旦你發現問題是偏斜的分區,你可以找到一種方法來擺脫偏斜的關鍵,或者你可以重新分區你的數據框,這將迫使它得到平均分配,現在看到所有的執行者都會在相同的時間內工作,並且你會看到更少的可怕OOM錯誤,並且處理速度也會非常快。
這些僅僅是我作爲Spark新手的兩分錢,我希望Spark專家可以在這個問題上增加一些內容,因爲我認爲Spark世界中的很多新手常常面臨類似的問題。