2017-04-08 49 views
2

我正在研究一個讀取大量配置單元表並將其解析爲一些DenseVectors以便在SparkML中最終使用的管道。我想做很多迭代來找到最佳的訓練參數,包括模型的輸入和計算資源。我正在使用的數據幀大約在50-100gb之間,分佈在YARN羣集上的動態數量的執行器上。Pyspark:將數據幀保存到hadoop或hdfs而不會溢出內存?

無論何時我試圖保存,無論是parquet還是saveAsTable,我都會在最終失敗之前得到一系列失敗的任務,並建議提高spark.yarn.executor.memoryOverhead。每個id是一行,不超過幾kb。

feature_df.write.parquet('hdfs:///user/myuser/featuredf.parquet',mode='overwrite',partitionBy='id') 

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 98 in stage 33.0 failed 4 times, most recent failure: Lost task 98.3 in 
stage 33.0 (TID 2141, rs172.hadoop.pvt, executor 441): ExecutorLostFailure 
(executor 441 exited caused by one of the running tasks) Reason: Container 
killed by YARN for exceeding memory limits. 12.0 GB of 12 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead. 

我目前在2g。

Spark工作人員當前正在獲得10GB,驅動程序(不在羣集中)獲得16GB,maxResultSize爲5GB。

我在寫入之前緩存數據幀,還能做些什麼來排除故障?

編輯:它似乎是試圖一次完成我所有的轉換。當我查看saveAsTable()方法的詳細信息時:

== Physical Plan == 
InMemoryTableScan [id#0L, label#90, features#119] 
    +- InMemoryRelation [id#0L, label#90, features#119], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
     +- *Filter (isnotnull(id#0L) && (id#0L < 21326835)) 
      +- InMemoryTableScan [id#0L, label#90, features#119], [isnotnull(id#0L), (id#0L < 21326835)] 
        +- InMemoryRelation [id#0L, label#90, features#119], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
         +- *Project [id#0L, label#90, pythonUDF0#135 AS features#119] 
          +- BatchEvalPython [<lambda>(collect_list_is#108, 56845.0)], [id#0L, label#90, collect_list_is#108, pythonUDF0#135] 
           +- SortAggregate(key=[id#0L, label#90], functions=[collect_list(indexedSegs#39, 0, 0)], output=[id#0L, label#90, collect_list_is#108]) 
           +- *Sort [id#0L ASC NULLS FIRST, label#90 ASC NULLS FIRST], false, 0 
            +- Exchange hashpartitioning(id#0L, label#90, 200) 
             +- *Project [id#0L, UDF(segment#2) AS indexedSegs#39, cast(label#1 as double) AS label#90] 
              +- *BroadcastHashJoin [segment#2], [entry#12], LeftOuter, BuildRight 
              :- HiveTableScan [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, reka_data_long_all_files 
              +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) 
               +- *Project [cast(entry#7 as string) AS entry#12] 
                +- HiveTableScan [entry#7], MetastoreRelation reka_trop50, public_crafted_audiences_sized 
+0

您是否檢查過您正在執行的多次迭代過程中數據是否偏斜?這裏有一個例子,有人面臨類似的情況 - - http://stackoverflow.com/questions/43081465/spark-container-executor-ooms-during-reducebykey – Pushkr

+0

不,因爲我不想開始執行迭代,直到我可以在流程開始時直接從磁盤讀取準備好的數據,因此還沒有任何迭代。 –

回答

0

最後,我從星火用戶郵件列表得到的線索是看分區,既平衡和尺寸。正如計劃者所做的那樣,對於一個執行者實例來說太多了。將.repartition(1000)添加到創建要寫入的數據框的表達式會產生所有差異,並且可以通過在智能鍵列上創建和分區來實現更多收益。

0

我的建議是禁用動態分配。試着用下面的配置中運行它:

--master yarn-client --driver-memory 15g --executor-memory 15g --executor-cores 10 --num-executors 15 -Dspark.yarn.executor.memoryOverhead=20000 -Dspark.yarn.driver.memoryOverhead=20000 -Dspark.default.parallelism=500 
+0

沒有運氣。一切順利,直到它試圖在其中一個大型數據框上執行saveAsTable()。產生:「17/04/11 03:36:14 ERROR cluster.YarnScheduler:丟失rs209.hadoop.pvt中的執行程序4:由於超出內存限制而被YARN殺死的容器。17 GB使用17 GB物理內存。考慮提升火花。 yarn.executor.memoryOverhead 「 –

+0

-master yarn-client -driver-memory 15g -executor-memory 15g -executor-cores 10 -num-executors 15 -Dspark.yarn.executor.memoryOverhead = 100000 - Dspark.yarn.driver.memoryOverhead = 20000 -Dspark.default.parallelism = 500 –

+0

沒有變化。在編寫之前,我也注意緩存數據幀,沒有區別。 –

相關問題