2016-01-06 42 views
2

我試圖安裝Spark ML管道,但執行程序死亡。 The project is also on GitHub。這是腳本不起作用(有點簡化):Spark 1.6.0執行程序因類ClassCastException而死,並導致超時

// Prepare data sets 
logInfo("Getting datasets") 
val emoTrainingData = sqlc.read.parquet("/tw/sentiment/emo/parsed/data.parquet") 
val trainingData = emoTrainingData 

// Configure the pipeline 
val pipeline = new Pipeline().setStages(Array(
    new FeatureReducer().setInputCol("raw_text").setOutputCol("reduced_text"), 
    new StringSanitizer().setInputCol("reduced_text").setOutputCol("text"), 
    new Tokenizer().setInputCol("text").setOutputCol("raw_words"), 
    new StopWordsRemover().setInputCol("raw_words").setOutputCol("words"), 
    new HashingTF().setInputCol("words").setOutputCol("features"), 
    new NaiveBayes().setSmoothing(0.5).setFeaturesCol("features"), 
    new ColumnDropper().setDropColumns("raw_text", "reduced_text", "text", "raw_words", "words", "features") 
)) 

// Fit the pipeline 
logInfo(s"Training model on ${trainingData.count()} rows") 
val model = pipeline.fit(trainingData) 

它執行到最後一行。它打印「xx行上的訓練模型」,然後開始擬合,執行程序死亡,驅動程序不會從執行程序收到心跳,並超時,然後腳本退出。它不會超過那條線。

這是殺滅遺囑執行人的例外:

java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) 
    at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219) 
    at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.spark.util.Utils$.deserialize(Utils.scala:92) 
    at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:436) 
    at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:426) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:426) 
    at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:424) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:742) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:424) 
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:468) 
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468) 
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) 
    at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:468) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics 
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) 
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006) 
    at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501) 
    at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204) 
    ... 32 more 

其中,後來,導致超時:

ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 142918 ms 

我上傳的INFO級日誌文件here。 DEBUG日誌是〜500MB。

構建文件和相關性似乎是沒事:

name := "tweeather" 

version := "1.0.0" 

scalaVersion := "2.11.7" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.6.0", 
    "org.apache.spark" %% "spark-mllib" % "1.6.0", 
    "org.apache.spark" %% "spark-streaming" % "1.6.0", 
    "org.apache.hadoop" % "hadoop-client" % "2.7.1", 
    "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(), 
    "org.twitter4j" % "twitter4j-stream" % "4.0.4", 
    "org.scalaj" %% "scalaj-http" % "2.0.0", 
    "com.jsuereth" %% "scala-arm" % "1.4", 
    "edu.ucar" % "grib" % "4.6.3" 
) 

dependencyOverrides ++= Set(
    "com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4", 
    "org.scala-lang" % "scala-compiler" % scalaVersion.value, 
    "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4", 
    "org.scala-lang.modules" %% "scala-xml" % "1.0.4", 
    "jline" % "jline" % "2.12.1" 
) 

resolvers ++= Seq(
    "Unidata Releases" at "http://artifacts.unidata.ucar.edu/content/repositories/unidata-releases/" 
) 

回答

1

我仍然不知道是什麼原因實際上是,但我只用輸入數據和它的第三再次運行腳本工作。它沒有失敗了。根據我的觀察,如果我有超過10,000個任務,它就會崩潰。

我最終將我的數據(在另一個腳本中)合併爲99個分區。在我再次運行腳本後,它成功計算了一切。

+2

更多的信息在這裏:https://issues.apache.org/jira/browse/SPARK-12675 – aluxian

+0

可能是一個內存問題,你的分區使用了大量的內存和Spark無法分配它。看看日誌中是否有另外一個參考。 –

0

我有同樣的問題,但工作沒有崩潰。它拋出了錯誤,但無論如何它會完成這項工作。所以這看起來像是一個鎖定問題。

當我提出配置使用2 proc(本地主機[2])它消失了。所以你可能有更多的任務正在進行,而不是你的流程可以處理的。

相關問題