在我的Spark應用程序中,我想對循環中的數據幀執行操作並將結果寫入hdfs。遞歸數據幀操作
僞代碼:
var df = emptyDataframe
for n = 1 to 200000{
someDf=read(n)
df = df.mergeWith(somedf)
}
df.writetohdfs
在上面的例子中,我取得好成績時, 「mergeWith」 做了unionAll。但是,當在「mergeWith」中做一個(簡單)加入時,作業變得非常慢(每個核心有2個執行器,每個核心大於1h)並且永遠不會結束(作業會自行中止)。
在我的場景中,我用大約含有〜1mb的文本數據的文件進行了大約50次迭代。
因爲合併順序對我來說很重要,我懷疑這是由於DAG的產生,導致整個事情在我存儲數據的那一刻運行。
現在我正試圖在合併的數據框上使用一個.persist,但是這似乎也相當緩慢。
編輯:
由於作業運行,我注意到(儘管我做了計數和.persist)在內存中的數據幀看起來並不像一個靜態的數據幀。它看起來像是它一直在做的所有合併的一條通路,有效地減緩了工作的線性。
我有權利承擔var df
是這個的罪魁禍首嗎?
擊穿的問題,因爲我看到它:
dfA = empty
dfC = dfA.increment(dfB)
dfD = dfC.increment(dfN)....
當我希望DF」 AC和d爲對象,不同的火花的東西,如果我堅持不關心或是否重新分配。 星火它看起來像這樣:
dfA = empty
dfC = dfA incremented with df B
dfD = ((dfA incremented with df B) incremented with dfN)....
UPDATE2
爲了擺脫持續的DF的我能「破發」的血統轉換DF並RDD,然後再返回時,無法正常工作。 這有一點額外的開銷,但是可以接受的(工作在幾分鐘內完成,而不是幾個小時/從不) 我將對持久性進行一些更多的測試,並以解決方法的形式制定答案。
結果: 這似乎只能解決這些問題的表面。在現實中我又回到了起點,並得到OOM異常java.lang.OutOfMemoryError: GC overhead limit exceeded
我不清楚這個'mergeWith'函數應該做什麼(你同時編寫union和join)。你能否包含'mergeWith'的代碼? –
'mergeWith'函數可以是很多事情,當它只是一個'union all'時,我會得到很好的結果。或者它可能是這樣的: 'SELECT f。* FROM full f LEFT OUTER JOIN delta delta ON CONCAT(fa,fb)= CONCAT(ia,iz)WHERE CONCAT(ia,iz)is NULL UNION ALL SELECT d。* FROM delta d' – Havnar
做一個'union'和做'join'有很大的區別。對於'union',Spark只需要寫入附加的數據,而對於'join'則必須將數據混洗。根據你的數據的大小,當然做一個'join'可能很容易出現OOM異常 - 特別是因爲你正在運行在一個很小的羣集上。 –