我想使用spark將大型(51GB)XML文件(在外部硬盤上)讀入數據幀(使用spark-xml plugin),執行簡單映射/過濾,重新排序它,然後將其作爲CSV文件寫回磁盤。在Spark中讀取大文件時發生內存不足錯誤2.1.0
但我總是得到一個java.lang.OutOfMemoryError: Java heap space
無論我如何調整這一點。
我想知道爲什麼不增加分區的數量停止OOM錯誤
難道不應該分裂的任務分解成多個部分,使每個人的部分較小,不會造成內存問題?
(Spark can't possibly be trying to stuff everything in memory and crashing if it doesn't fit, right??)
事情我已經嘗試:
- 重新分區/讀取和寫入時,當合併到(5000和10000個分區)數據幀(初始值是1604)
- 使用數量較少的遺囑執行人(6,4,甚至執行人我得到OOM錯誤!)
- 減少分割文件的大小(默認看起來像它的33MB)
- 給予噸RAM(我的全部)
- 增加
spark.memory.fraction
〜0.8(默認爲0.6) - 降低
spark.memory.storageFraction
〜0.2(默認爲0.5) - 設置
spark.default.parallelism
到30和40(默認爲8爲我) - 設置
spark.files.maxPartitionBytes
爲64M(默認爲128M)
我所有的代碼是在這裏(請注意,我沒有任何緩存):
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema) // defined previously
.option("rowTag", "row")
.load(s"$pathToInputXML")
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604
// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)
// filter and select only the cols i'm interested in
val dsout = df2
.where(df2.col("_TypeId") === "1")
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
).as[Post]
// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r
// more mapping
dsout
.map{
case Post(id,title,body,tags) =>
val body1 = tagPat.replaceAllIn(body,"")
val body2 = whitespacePat.replaceAllIn(body1," ")
Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))
}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)
注意
- 輸入分流真的很小(僅33MB),所以爲什麼我不能擁有8個線程處理每一個分裂?它真的不應該吹我的記憶(我瑟
UPDATE我寫的只是讀取該文件,然後forEachPartition(的println)代碼的短版本。
我得到相同的OOM錯誤:
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema)
.option("rowTag", "row")
.load(s"$pathToInputXML")
.repartition(numPartitions)
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
df
.where(df.col("_PostTypeId") === "1")
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
df("_Tags").as("tags")
).as[Post]
.map {
case Post(id, title, body, tags) =>
Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
}
.foreachPartition { rdd =>
if (rdd.nonEmpty) {
println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
}
}
PS:我正在使用spark v 2.1.0。我的機器有8個內核和16 GB內存。
您是否檢查過Spark UI中創建的分區的大小? – Khozzy
@Khozzy這是我用1604個分區讀取DF和50個分區寫入DF時的應用程序:[screenshot-spark-ui](http://i.imgur.com/a5LjEmc。 png) –
是的,但在作業執行過程中查看UI。你會發現每個任務執行的時間以及你的CPU的使用情況(可能有零星)。 – Khozzy