我在這裏閱讀這篇文章:https://spark.apache.org/docs/latest/programming-guide.html(請參閱將函數傳遞給Spark),但我的用例是使用類型化數據集與我的案例類。我試圖使用單身對象來保存映射方法。我想知道如何打包我需要的功能來優化我的舞臺的性能(將數據集從一種類型轉換爲另一種類型,然後寫入實木複合地板)。當使用數據集,大型Java類和單例時,Spark傳遞函數
目前,階段性步驟花費了大約300萬行(〜1.5小時)的難以置信的長時間,大約880 MB數據輸出到s3實木複合地板。
我在集羣模式下運行,使用最少執行程序= 3,最大執行程序= 10,每個執行程序有4個內核,驅動程序內存8GB。
-
高層次的編碼部分:
我映射一個案例類C1到另一個案例類C2。 C1和C2有大約16個字段,各種類型,如java.sql.Timestamp,Option [String] Option [Int],String,Int,BigInt。
case class C1(field1 : _, field2 : _, field3 : _, ...)
case class C2(field1 : _, field2 : _, field3 : _, ...)
爲了從C1至C2映射,我需要一個非常大的java類Ĵ我是從https://github.com/drtimcooper/LatLongToTimezone複製的功能(靜態方法)。
public class J {
public static String getValue((float) v) = ...
}
我已經在一個util類裏面寫了映射函數Util,它具有許多其他有用的函數,它們被映射函數調用。
=========
基本上我的碼流是這樣的:
case class C1(field1 : _, field2 : _, field3 : _, ...)
case class C2(field1 : _, field2 : _, field3 : _, ...)
// very large java class J that only contains static methods
public class J {
public static String getValue((float) v) = ...
...
}
object Util {
def m1(i: Int): Int = ...
def m2(l: Option[BigDecimal], l2: Option[BigDecimal]): Int = {
J.getValue(l.get, l2.get)
}
...
def convert_C1_to_C2(c1: C1): C2 = {
C2(
field1 = m1(c1.field1),
field2 = m2(c1.field2, c1.field3),
...
}
}
dataframe.as[C1].map(Util.convert_C1_to_C2)
.mode(SaveMode.Overwrite)
.parquet("s3a://s3Path")
有沒有寫這個更優化的方式嗎?或者任何人都可以指出我做過這些事情的任何明顯的錯誤?看着我的代碼,我不知道爲什麼它要花很長時間才能完成任務。
我已經試過合併說16個分區來減少s3中的文件數量,但這似乎使作業運行速度慢得多。通常會有64個分區沒有任何合併。