2017-01-27 57 views
1

我有一個腳本來處理幾百GB的數據,當我嘗試處理500GB以上的數據時,我遇到了麻煩,在它低於此值的情況下它工作正常。 首先調試應用程序我得到了有關越過spark.driver.maxResultSize值限制的錯誤,所以我將此值增加到4g,現在失敗的任務正在工作,但現在我有另一個問題,當我嘗試將結果保存到一個文件拼花,任務失敗,並引發此錯誤使用sparkConf()設置2個配置值set

17/01/27 06:35:27 INFO DAGScheduler: Job 7 failed: parquet at NativeMethodAccessorImpl.java:-2, took 12.106390 s 
17/01/27 06:35:27 ERROR InsertIntoHadoopFsRelation: Aborting job. 
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 146:0 was 765207245 bytes, which exceeds max allowed: spark.akka.frameSize (134217728 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize 

所以,看來我需要增加spark.akka.frameSize值

我的問題是,我已經使用函數sparkConf()。set增加maxResultSize,但我不知道如何(或語法)增加sparkConf()。set中的兩個值。

這是我在這些部分的代碼怎麼看:

conf = (SparkConf().set("spark.driver.maxResultSize", "4g")) 
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc) 

,這就是失敗的任務:

sqlContext.sql(sql).coalesce(5).write.parquet(sys.argv[3], mode='overwrite') 

只有一兩件事,我不能修改火花集羣在conf文件還有,我們使用luigi來提交任務來激發,所以我不能在腳本執行時修改spark-submit字符串(這就是爲什麼我要直接從腳本修改參數)

任何指導表示讚賞。

+1

什麼'SC = SparkContext(CONF = SparkConf()。集( 「spark.driver.maxResultSize」, 「4G」)集(「spark.akka.frameSize 」, 「256M」))'? –

+1

Oops ... _「spark.akka.frameSize ...最大郵件大小(MB)」_ >>它應該是'.set(「spark.akka.frameSize」,「256」)' –

回答

1

RTFM - 直接從Spark 1.6.3 Python API documentation ...

類pyspark。 SparkConf(...)

所有setter方法在這個類支持
對於 例如,你可以寫conf.setMaster"local").setAppName("My app")