0
我有一個問題與Apache的火花。 我使用yarn-client模式工作。Pyspark無法連接到執行器
這是我的配置:
conf.set("spark.executor.memory", "4g")
conf.set("spark.driver.memory", "4g")
conf.set("spark.driver.cores", "4")
conf.set("spark.default.parallelism", "3")
conf.set("spark.executor.cores", "2")
conf.set("spark.num.executor", "8")
conf.set("spark.shuffle.io.maxRetries", "20")
這是我的代碼:
我有2個dataframes,DF和other_df。 我第一次內部加入2個數據框來獲取基於ID的過濾後的數據。 然後我彙總計算每月CA的平均值(df_agg)。 然後我想收集()或拿(12)繪製圖形。當我收集發生 錯誤()
#--- inner join to select the right segment ---#
new_df = (other_df
.join(df, df.ID == other_df.ID
)
)
#--- aggregate the mean per month ---#
df_agg = (new_df
.groupBy("month")
.avg('CA')
)
#--- collect() ---#
data = df_agg.collect()
x, y = zip(*data)
我得到了星火此錯誤消息:
Py4JJavaError: An error occurred while calling o824.javaToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
.....
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at org.apache.spark.sql.DataFrame.javaToPython(DataFrame.scala:1582)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
......
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142)
at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 27 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 35 more
,這些都是在星火UI中的錯誤消息:
org.apache.spark.shuffle.FetchFailedException: Failed to connect to <executor> +details
org.apache.spark.shuffle.FetchFailedException: Failed to connect to <executor>
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:306)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
我有閱讀錯誤日誌時遇到困難。 我試圖增加執行程序的數量(最多8個),增加shuffle.io.maxRetries(最多20個)。我也讀了this post,這很有用,但我很難理解要調整的參數。
尤其是因爲有時作業成功,下一次失敗......
我再改配置有:
conf.set("spark.yarn.executor.memoryOverhead", "600")
但它也不能工作。
有沒有人有關於這個問題的線索?
謝謝
謝謝。你介意詳細說明你的答案嗎?我在火花文檔中看不到spark.sql.boradcastTimeout ... – shad