2016-01-05 93 views
0

我有一個問題與Apache的火花。 我使用yarn-client模式工作。Pyspark無法連接到執行器

這是我的配置:

conf.set("spark.executor.memory", "4g") 
conf.set("spark.driver.memory", "4g") 
conf.set("spark.driver.cores", "4") 
conf.set("spark.default.parallelism", "3") 
conf.set("spark.executor.cores", "2") 
conf.set("spark.num.executor", "8") 
conf.set("spark.shuffle.io.maxRetries", "20") 

這是我的代碼:

我有2個dataframes,DF和other_df。 我第一次內部加入2個數據框來獲取基於ID的過濾後的數據。 然後我彙總計算每月CA的平均值(df_agg)。 然後我想收集()或拿(12)繪製圖形。當我收集發生 錯誤()

#--- inner join to select the right segment ---# 
new_df = (other_df 
    .join(df, df.ID == other_df.ID 
     ) 
) 
#--- aggregate the mean per month ---# 
df_agg = (new_df 
      .groupBy("month") 
      .avg('CA') 
     ) 
#--- collect() ---# 
data = df_agg.collect() 
x, y = zip(*data) 

我得到了星火此錯誤消息:

Py4JJavaError: An error occurred while calling o824.javaToPython. 
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: 
..... 

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) 
    at org.apache.spark.sql.DataFrame.javaToPython(DataFrame.scala:1582) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: 
...... 
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
    at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142) 
    at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141) 
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) 
    ... 27 more 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
    at scala.concurrent.Await$.result(package.scala:107) 
    at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
    at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69) 
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) 
    ... 35 more 

,這些都是在星火UI中的錯誤消息:

org.apache.spark.shuffle.FetchFailedException: Failed to connect to <executor> +details 
org.apache.spark.shuffle.FetchFailedException: Failed to connect to <executor> 
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321) 
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:306) 
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 

我有閱讀錯誤日誌時遇到困難。 我試圖增加執行程序的數量(最多8個),增加shuffle.io.maxRetries(最多20個)。我也讀了this post,這很有用,但我很難理解要調整的參數。

尤其是因爲有時作業成功,下一次失敗......

我再改配置有:

conf.set("spark.yarn.executor.memoryOverhead", "600") 

但它也不能工作。

有沒有人有關於這個問題的線索?

謝謝

回答

0

您可以設置spark.sql.broadcastTimeout比300S的默認的值。

+0

謝謝。你介意詳細說明你的答案嗎?我在火花文檔中看不到spark.sql.boradcastTimeout ... – shad

相關問題