2017-01-21 60 views
0

環境:火花1.60。我使用Scala。 我可以通過sbt編譯程序,但是當我提交程序時,它遇到了錯誤。 我的完整的錯誤是如下:任務不可序列約aggegateByKey

238 17/01/21 18:32:24 INFO net.NetworkTopology: Adding a new node: /YH11070029/10.39.0.213:50010 
17/01/21 18:32:24 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.39.0.44:41961 with 2.7 GB RAM, BlockManagerId(349, 10.39.0.44, 41961) 
17/01/21 18:32:24 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.39.2.178:48591 with 2.7 GB RAM, BlockManagerId(518, 10.39.2.178, 48591) 
Exception in thread "main" org.apache.spark.SparkException: Task not  serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:93) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:82) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:82) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1.apply(PairRDDFunctions.scala:177) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1.apply(PairRDDFunctions.scala:166) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.PairRDDFunctions.aggregateByKey(PairRDDFunctions.scala:166) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$3.apply(PairRDDFunctions.scala:206) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$3.apply(PairRDDFunctions.scala:206) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.PairRDDFunctions.aggregateByKey(PairRDDFunctions.scala:205) 
    at com.sina.adalgo.feature.ETL$$anonfun$13.apply(ETL.scala:190) 
    at com.sina.adalgo.feature.ETL$$anonfun$13.apply(ETL.scala:102) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 

代碼的目的是來統計類別特徵frequentencies。主要代碼如下:

object ETL extends Serializable { 
      ... ... 


val cateList = featureData.map{v => 
    case (psid: String, label: String, cate_features: ParArray[String], media_features: String) => 
     val pair_feature = cate_features.zipWithIndex.map(x => (x._2, x._1)) 
     pair_feature 
}.flatMap(_.toList) 

def seqop(m: HashMap[String, Int] , s: String) : HashMap[String, Int]={ 
    var x = m.getOrElse(s, 0) 
    x += 1 
    m += s -> x 
    m 
} 

def combop(m: HashMap[String, Int], n: HashMap[String, Int]) : HashMap[String, Int]={ 
    for (k <- n) { 
     var x = m.getOrElse(k._1, 0) 
     x += k._2 
     m += k._1 -> x 
    } 
    m 
} 

val hash = HashMap[String, Int]() 
val feaFreq = cateList.aggregateByKey(hash)(seqop, combop)// (i, HashMap[String, Int]) i corresponded with categorical feature 

該對象具有繼承的Serializable。 爲什麼?你能幫我嗎?

+5

您可以加入的代碼?沒有辦法只看到異常問題的原因。 – maasg

+2

「任務不可序列化」。檢測你自己的代碼是否有不可序列化的對象。代碼顯示爲 –

+0

。我檢查對象是可序列化的。 –

回答

0

對我來說,這個問題通常發生在星火當我們使用閉包作爲聯合國intentially關閉了一些不需要的對象和/或有時只是一個功能是主類的我們火花驅動程序代碼內聚集功能。

我懷疑這個可能是在這裏,因爲你的堆棧跟蹤涉及org.apache.spark.util.ClosureCleaner作爲最高級別的罪魁禍首。

這是有問題的,因爲在這種情況下,當Spark嘗試將該函數轉發給工作人員以便他們可以進行實際的聚合時,它會比實際意圖的結束序列化更多:函數加上其周圍的類。

另請參閱this post by Erik Erlandson其中一些關閉序列化的邊界案例已被很好地解釋,以及Spark 1.6 notes on closures

快速修復可能會將您在aggregateByKey中使用的函數的定義移動到一個單獨的對象,與其餘代碼完全無關。

+0

tks。我試着將你在aggregateByKey中使用的函數的定義移動到一個單獨的對象上,並且它不起作用。我的代碼發佈了,並且我懷疑哈希有什麼問題(HashMap [String,Int]) –

+0

感謝您的反饋和代碼。嘗試將'seqop'和'combop'移動到一個單獨的對象中,而不在該對象內和ETL外。我認爲,這裏發生了什麼是被載入到系列化整個'ETL'對象,因爲它包含'cateList'和'feaFreq',這是RDD的,序列化失敗。 – Svend

相關問題