2016-11-16 72 views
0

非常緩慢加入我做了一個公共列連接兩個dataframes,然後跑了表演方法:從丟失,火花

df= df1.join(df2, df1.col1== df2.col2, 'inner') 
    df.show() 

然後加入跑得很慢,最後引發錯誤:從丟失。

Py4JJavaError: An error occurred while calling o109.showString. 

    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 : ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Slave lost 

驅動程序堆棧跟蹤: 在

org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745)

一些搜索後,似乎這是一個內存相關的問題。然後,我增加了重新分配到3000,增加執行人的記憶,增加了記憶力,但仍然沒有運氣,我得到了同樣的奴隸失去的錯誤。在df.show()期間,我發現其中一個執行器shuffle的寫入大小非常高,其他的不是很高。 任何線索? 由於使用Scala的嘗試

+0

聲音就像數據可能會傾斜 - 兩個數據幀中分別有多少行?你還在運行什麼類型的實例類型以及分配多少內存?你可以嘗試在'join'之後做'count'而不是'show'嗎? –

+0

@GlennieHellesSindholt是計數經歷了。一個數據幀比另一個數據幀大100倍。較小的DF約爲6M。我在EC2上使用Spark 1.6。 – newleaf

+0

我懷疑如此。我在猜測,如果你做了一個'df = df1.join(df2,df1.col1 == df2.col2,'inner')。persist(StorageLevel.MEMORY_AND_DISK)''後跟一個'df.count',後面跟着一個'df.show',它可能也會通過,對吧? –

回答

1

如果

val df = df1.join(df2,Seq("column name")) 

如果pyspark

df = df1.join(df2,["columnname"]) 

df = df1.join(df2,df1.columnname == df2.columnname) 
display(df) 

如果試圖做同樣的pyspark - SQL

df1.createOrReplaceTempView("left_test_table") 
df2..createOrReplaceTempView("right_test_table") 
left <- sql(sqlContext, "SELECT * FROM left_test_table") 
right <- sql(sqlContext, "SELECT * FROM right_test_table") 

head(drop(join(left, right), left$name))