我有一個需要三個輸入並執行兩個外部連接的spark任務。數據採用鍵值格式(String,Array [String])。代碼的最重要的部分是:當羣集大小很大時,Spark作業失敗,小時成功
val partitioner = new HashPartitioner(8000)
val joined = inputRdd1.fullOuterJoin(inputRdd2.fullOuterJoin(inputRdd3, partitioner), partitioner).cache
saveAsSequenceFile(joined, filter="X")
saveAsSequenceFile(joined, filter="Y")
我正在與r3.4xlarge司機節點和500個m3.xlarge工作節點上EMR工作。火花提交的參數是:
spark-submit --deploy-mode client --master yarn-client --executor-memory 3g --driver-memory 100g --executor-cores 3 --num-executors 4000 --conf spark.default.parallelism=8000 --conf spark.storage.memoryFraction=0.1 --conf spark.shuffle.memoryFraction=0.2 --conf spark.yarn.executor.memoryOverhead=4000 --conf spark.network.timeout=600s
更新:使用此設置,在火花的作業UI看到執行人的人數分別爲500(每個節點之一)
的例外,我在驅動程序的日誌中看到以下:
17/10/13 21:37:57 WARN HeartbeatReceiver: Removing executor 470 with no recent heartbeats: 616136 ms exceeds timeout 600000 ms
17/10/13 21:39:04 ERROR ContextCleaner: Error cleaning broadcast 5
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 seconds]. This timeout is controlled by spark.network.timeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
...
有些事情我試過失敗:
- 我想這個問題是因爲的T這裏有太多的執行者正在產生,驅動程序會追蹤這些執行者。我試圖通過將執行程序內存增加到4g來減少執行程序的數量。這沒有幫助。
- 我試着將驅動程序的實例類型更改爲r3.8xlarge,這也沒有幫助。
令人驚訝的是,當我將工作節點的數量減少到300時,作業將運行文件。有沒有人有任何其他假設爲什麼會發生?
- https://stackoverflow.com/questions/40474057/what-are-possible-reasons-for-receiving-timeoutexception-futures-timed-out-afte - https://stackoverflow.com/questions/45171175/錯誤錯誤清潔廣播的異常 –