我正在研究一個讀取大量配置單元表並將其解析爲一些DenseVectors以便在SparkML中最終使用的管道。我想做很多迭代來找到最佳的訓練參數,包括模型的輸入和計算資源。我正在使用的數據幀大約在50-100gb之間,分佈在YARN羣集上的動態數量的執行器上。Pyspark:將數據幀保存到hadoop或hdfs而不會溢出內存?
無論何時我試圖保存,無論是parquet還是saveAsTable,我都會在最終失敗之前得到一系列失敗的任務,並建議提高spark.yarn.executor.memoryOverhead。每個id
是一行,不超過幾kb。
feature_df.write.parquet('hdfs:///user/myuser/featuredf.parquet',mode='overwrite',partitionBy='id')
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 98 in stage 33.0 failed 4 times, most recent failure: Lost task 98.3 in
stage 33.0 (TID 2141, rs172.hadoop.pvt, executor 441): ExecutorLostFailure
(executor 441 exited caused by one of the running tasks) Reason: Container
killed by YARN for exceeding memory limits. 12.0 GB of 12 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
我目前在2g。
Spark工作人員當前正在獲得10GB,驅動程序(不在羣集中)獲得16GB,maxResultSize爲5GB。
我在寫入之前緩存數據幀,還能做些什麼來排除故障?
編輯:它似乎是試圖一次完成我所有的轉換。當我查看saveAsTable()方法的詳細信息時:
== Physical Plan ==
InMemoryTableScan [id#0L, label#90, features#119]
+- InMemoryRelation [id#0L, label#90, features#119], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Filter (isnotnull(id#0L) && (id#0L < 21326835))
+- InMemoryTableScan [id#0L, label#90, features#119], [isnotnull(id#0L), (id#0L < 21326835)]
+- InMemoryRelation [id#0L, label#90, features#119], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [id#0L, label#90, pythonUDF0#135 AS features#119]
+- BatchEvalPython [<lambda>(collect_list_is#108, 56845.0)], [id#0L, label#90, collect_list_is#108, pythonUDF0#135]
+- SortAggregate(key=[id#0L, label#90], functions=[collect_list(indexedSegs#39, 0, 0)], output=[id#0L, label#90, collect_list_is#108])
+- *Sort [id#0L ASC NULLS FIRST, label#90 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#0L, label#90, 200)
+- *Project [id#0L, UDF(segment#2) AS indexedSegs#39, cast(label#1 as double) AS label#90]
+- *BroadcastHashJoin [segment#2], [entry#12], LeftOuter, BuildRight
:- HiveTableScan [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, reka_data_long_all_files
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [cast(entry#7 as string) AS entry#12]
+- HiveTableScan [entry#7], MetastoreRelation reka_trop50, public_crafted_audiences_sized
您是否檢查過您正在執行的多次迭代過程中數據是否偏斜?這裏有一個例子,有人面臨類似的情況 - - http://stackoverflow.com/questions/43081465/spark-container-executor-ooms-during-reducebykey – Pushkr
不,因爲我不想開始執行迭代,直到我可以在流程開始時直接從磁盤讀取準備好的數據,因此還沒有任何迭代。 –