2017-03-24 21 views
1

我在這裏閱讀這篇文章:https://spark.apache.org/docs/latest/programming-guide.html(請參閱將函數傳遞給Spark),但我的用例是使用類型化數據集與我的案例類。我試圖使用單身對象來保存映射方法。我想知道如何打包我需要的功能來優化我的舞臺的性能(將數據集從一種類型轉換爲另一種類型,然後寫入實木複合地板)。當使用數據集,大型Java類和單例時,Spark傳遞函數

目前,階段性步驟花費了大約300萬行(〜1.5小時)的難以置信的長時間,大約880 MB數據輸出到s3實木複合地板。

我在集羣模式下運行,使用最少執行程序= 3,最大執行程序= 10,每個執行程序有4個內核,驅動程序內存8GB。

-

高層次的編碼部分:

我映射一個案例類C1到另一個案例類C2。 C1和C2有大約16個字段,各種類型,如java.sql.Timestamp,Option [String] Option [Int],String,Int,BigInt。

case class C1(field1 : _, field2 : _, field3 : _, ...) 
case class C2(field1 : _, field2 : _, field3 : _, ...) 

爲了從C1至C2映射,我需要一個非常大的java類Ĵ我是從https://github.com/drtimcooper/LatLongToTimezone複製的功能(靜態方法)。

public class J { 
    public static String getValue((float) v) = ... 
}  

我已經在一個util類裏面寫了映射函數Util,它具有許多其他有用的函數,它們被映射函數調用。

=========

基本上我的碼流是這樣的:

case class C1(field1 : _, field2 : _, field3 : _, ...) 
case class C2(field1 : _, field2 : _, field3 : _, ...) 

// very large java class J that only contains static methods 
public class J { 
    public static String getValue((float) v) = ... 

    ... 
}  

object Util { 
    def m1(i: Int): Int = ... 

    def m2(l: Option[BigDecimal], l2: Option[BigDecimal]): Int = { 
     J.getValue(l.get, l2.get) 
    } 

    ... 

    def convert_C1_to_C2(c1: C1): C2 = { 
    C2(
     field1 = m1(c1.field1), 
     field2 = m2(c1.field2, c1.field3), 
     ... 
    } 
} 

dataframe.as[C1].map(Util.convert_C1_to_C2) 
    .mode(SaveMode.Overwrite) 
    .parquet("s3a://s3Path") 

有沒有寫這個更優化的方式嗎?或者任何人都可以指出我做過這些事情的任何明顯的錯誤?看着我的代碼,我不知道爲什麼它要花很長時間才能完成任務。

我已經試過合併說16個分區來減少s3中的文件數量,但這似乎使作業運行速度慢得多。通常會有64個分區沒有任何合併。

回答

0

您可能剛剛碰到包含elsewhere的slow-fakes-s3-rename問題。在那裏討論一些修復程序。

相關問題