2016-01-28 30 views
2

我試圖做一個傳遞閉包數據框。幾次迭代後,我得到一些內部火花異常。關於什麼原因以及如何解決它的任何想法。下面是我的程序:spark sql數據框加入與循環中的重命名

val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17), (17, 18), (18, 19)) 
var edges = e.map(p => Edge(p._1, p._2)).toDF() 
var filtered = edges 
    .filter("start = 1") 
    .distinct() 
    .withColumnRenamed("start", "fStart") 
    .withColumnRenamed("end", "fEnd") 

var i = 0 
while (i < 30) { 
    i = i + 1 
    println("\n i = " + i) 
    filtered = filtered 
    .join(edges, filtered("fEnd") === edges("start")) 
    .select(filtered("fStart"), edges("end")) 
    .withColumnRenamed("start", "fStart") 
    .withColumnRenamed("end", "fEnd").distinct 
    filtered.show 
} 

它需要在頂層中定義一個簡單的例子類:

case class Edge(start: Int, end: Int) 

,這裏是與異常輸出之後,火花掛了一會兒,然後退出時出現錯誤Executor heartbeat timed out

i = 1 
+------+----+ 
|fStart|fEnd| 
+------+----+ 
|  1| 4| 
+------+----+ 


i = 2 
+------+----+ 
|fStart|fEnd| 
+------+----+ 
|  1| 5| 
+------+----+ 


i = 3 
+------+----+ 
|fStart|fEnd| 
+------+----+ 
|  1| 6| 
+------+----+ 
... 

i = 10 
+------+----+ 
|fStart|fEnd| 
+------+----+ 
|  1| 13| 
+------+----+ 


i = 11 
16/01/29 00:28:59 ERROR Utils: Uncaught exception in thread driver-heartbeater 
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics 
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) 
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219) 
at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
at org.apache.spark.util.Utils$.deserialize(Utils.scala:92) 
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:436) 
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:426) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:426) 
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:424) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:424) 
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:468) 
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468) 
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468) 
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) 
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:468) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics 
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) 
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006) 
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501) 
at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220) 
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204) 
... 32 more 
+------+----+ 
|fStart|fEnd| 
+------+----+ 
|  1| 14| 
+------+----+ 
... 

PS1。如何在沒有列重命名的情況下完成這種連接? PS2。還有一些關於如何使用數據幀的文檔? API文檔非常簡約。

+1

嘗試在每次迭代結束時添加緩存:'filtered.cache()'在使用'show'之前 – Niemand

回答

1

這些錯誤似乎只在集羣資源不足以滿足請求並且積壓正在增加並且一段時間後出現這些類型錯誤時纔會出現。

要解決您的問題,請在filtered.show之前加filtered.cache

第16次迭代後也不會有結果,因爲沒有匹配的filtered.fEnd === edges.start

+0

確實添加了'filtered.cache'幫助(謝謝)。我知道,在一些迭代之後,程序在沒有做任何事情的情況下循環,只是顯示這個錯誤。我不明白的是資源如何缺乏。這是一些非常簡單的示例,其中包含一些ROW數據框和幾個空白框。此外爲什麼在這裏做緩存幫助? – user2038119

+0

另外,我添加'filtered.cache'並運行循環以獲得更多迭代後,我可以看到每次迭代需要更長的時間。這是爲什麼?一段時間後,他們產生相同的空白數據框。 PS。 @Sumit可以請你指點我的一些文檔或youtube視頻。 – user2038119

+0

DAG可以指導您確切的原因,但似乎是每次迭代增加時,它也會處理先前的迭代。例如'i = 2',它將處理'i = 1'的數據,然後處理'i = 2'的數據。請記住,Spark堅持需要應用於數據的變換(_Data lineage_),而不是變換的結果/輸出。爲了保持輸出,你特別需要調用'RDD.cache'。 – Sumit