2016-11-22 96 views
2

在我的Spark應用程序中,我想對循環中的數據幀執行操作並將結果寫入hdfs。遞歸數據幀操作

僞代碼:

var df = emptyDataframe 
for n = 1 to 200000{ 
    someDf=read(n) 
    df = df.mergeWith(somedf) 
} 
df.writetohdfs 

在上面的例子中,我取得好成績時, 「mergeWith」 做了unionAll。但是,當在「mergeWith」中做一個(簡單)加入時,作業變得非常慢(每個核心有2個執行器,每個核心大於1h)並且永遠不會結束(作業會自行中止)。

在我的場景中,我用大約含有〜1mb的文本數據的文件進行了大約50次迭代。

因爲合併順序對我來說很重要,我懷疑這是由於DAG的產生,導致整個事情在我存儲數據的那一刻運行。

現在我正試圖在合併的數據框上使用一個.persist,但是這似乎也相當緩慢。

編輯:

由於作業運行,我注意到(儘管我做了計數和.persist)在內存中的數據幀看起來並不像一個靜態的數據幀。它看起來像是它一直在做的所有合併的一條通路,有效地減緩了工作的線性。

我有權利承擔var df是這個的罪魁禍首嗎?

spiraling out of controle

擊穿的問題,因爲我看到它:

dfA = empty 
dfC = dfA.increment(dfB) 
dfD = dfC.increment(dfN).... 

當我希望DF」 AC和d爲對象,不同的火花的東西,如果我堅持不關心或是否重新分配。 星火它看起來像這樣:

dfA = empty 
dfC = dfA incremented with df B 
dfD = ((dfA incremented with df B) incremented with dfN).... 

UPDATE2

爲了擺脫持續的DF的我能「破發」的血統轉換DF並RDD,然後再返回時,無法正常工作。 這有一點額外的開銷,但是可以接受的(工作在幾分鐘內完成,而不是幾個小時/從不) 我將對持久性進行一些更多的測試,並以解決方法的形式制定答案。

結果: 這似乎只能解決這些問題的表面。在現實中我又回到了起點,並得到OOM異常java.lang.OutOfMemoryError: GC overhead limit exceeded

+0

我不清楚這個'mergeWith'函數應該做什麼(你同時編寫union和join)。你能否包含'mergeWith'的代碼? –

+0

'mergeWith'函數可以是很多事情,當它只是一個'union all'時,我會得到很好的結果。或者它可能是這樣的: 'SELECT f。* FROM full f LEFT OUTER JOIN delta delta ON CONCAT(fa,fb)= CONCAT(ia,iz)WHERE CONCAT(ia,iz)is NULL UNION ALL SELECT d。* FROM delta d' – Havnar

+0

做一個'union'和做'join'有很大的區別。對於'union',Spark只需要寫入附加的數據,而對於'join'則必須將數據混洗。根據你的數據的大小,當然做一個'join'可能很容易出現OOM異常 - 特別是因爲你正在運行在一個很小的羣集上。 –

回答

0

因此,以下是我最終使用的。它對我的用例來說足夠高性能,它工作並且不需要持久化。

非常多的是解決方法而不是解決方法。

val mutableBufferArray = ArrayBuffer[DataFrame]() 
mutableBufferArray.append(hiveContext.emptyDataframe()) 

for loop { 

       val interm = mergeDataFrame(df, mutableBufferArray.last) 
       val intermSchema = interm.schema 
       val intermRDD = interm.rdd.repartition(8) 


       mutableBufferArray.append(hiveContext.createDataFrame(intermRDD, intermSchema)) 
       mutableBufferArray.remove(0) 

} 

我這是怎麼搏鬥鎢爲合規。 通過從DF進入RDD並返回,我最終得到了一個真實的物體,而不是從前到後整個鎢生成的過程管道。

在我的代碼中,在寫入磁盤之前迭代幾次(50-150次迭代看起來效果最好)。這就是我再次清除bufferArray以重新開始的地方。

0

如果你有這樣的代碼:

var df = sc.parallelize(Seq(1)).toDF() 

for(i<- 1 to 200000) { 
    val df_add = sc.parallelize(Seq(i)).toDF() 
    df = df.unionAll(df_add) 
} 

然後DF將有40萬個分區之後,這使得以下行動效率低下(因爲你有每個分區1個任務)。

嘗試將分區數減少到例如200(使用例如df.coalesce(200).write.saveAsTable(....)

+0

我已經合併了輸出,對不起,我沒有提到。 這裏的問題是當我更新數據框中的記錄(所以不只是附加一個聯合所有) 因此,運行時本身已經放慢了持續時間(試圖不減速) 重新分區也沒有幫助。 – Havnar

+0

澄清我的意思;堅持或 在迭代過程中重新分區......所有結果都是相同的結果(請參見問題圖片),最後只需將問題移到需要寫入HDFS的位置 – Havnar