2017-02-28 27 views
0

...org.apache.spark.SparkException:任務不序列化的錯誤

val cols: Seq[String] = Seq("item", "SR", "RP") 
val vecToSeq = udf((v:org.apache.spark.ml.linalg.Vector) => v.toArray) 
val exprs = cols.zipWithIndex.map{ case(c,i) => $"_tmp".getItem(i).alias(c)} 
val DoubleDF = result5.select(vecToSeq($"vectorCol").alias("_tmp")).select(exprs:_*) 

...(對不起,我已經包括了一切,我認爲這是相關的,因爲我不知道有多少代碼。我應該給的信息。)

import org.apache.spark.ml.evaluation.RegressionEvaluator 
import org.apache.spark.ml.recommendation.ALS 
import org.apache.spark.mllib.recommendation.{Rating,ALS} 
... 

val als = new org.apache.spark.ml.recommendation.ALS().setRank(10).setMaxIter(10).setRegParam(0.01).setUserCol("user").setItemCol("item").setRatingCol("rating") 
... 

val itemFactors = model.itemFactors 
val item = Popular.select($"item").map(line => line.getDouble(0)).take(10).map(_.toInt) 

val popularFactored = itemFactors.where(item.map($"id" === _).reduce(_||_)) 

val exprs2 = (0 until 10).map(i => $"_tmp2".getItem(i).alias(s"z$i")) 
val factored = popularFactored.select(($"features").alias("_tmp2")).select(exprs2:_*) 

val AF = (0 until 10).map(i => factored.agg(avg(s"z$i")).first.getDouble(0)).toArray 


val toFloat = udf((line: Seq[Float]) => line.map(_.toDouble)) 
val test = itemFactors.withColumn("testX",toFloat(itemFactors("features"))) 
val itemFactors2 = test.select($"id",$"testX") 


val itemFeatures2 = itemFactors2.map{line => val feature = line.getAs[Seq[Double]]("testX") 
val item = line.getAs[Int]("id") 
(item,feature.toArray)} 

val itemFeaturesR = itemFeatures2.rdd 


val ItemFS= itemFeaturesR.map { case (id,factor) => 
val arr= new DoubleMatrix(10) 
for (i <- 0 until 10){ 
val itemVector = new DoubleMatrix(AF) 
val factorVector = factor(i) 
arr.put(i,factorVector) 
val sims = cosineSimilarity(arr.getRow(i), itemVector) 
(id,sims) 
} 
} 

我現在正在測量餘弦相似度。然而,當我運行上面的代碼,我得到一個錯誤「org.apache.spark.SparkException:任務不可序列

`org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2056) 
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366) 
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.map(RDD.scala:365) 
    ... 59 elided 
    Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column 
Serialization stack: 
     - object not serializable (class: org.apache.spark.sql.Column, value: _tmp[0] AS `item`) 
     - writeObject data (class: scala.collection.immutable.List$SerializationProxy) 
     - object (class scala.collection.immutable.List$SerializationProxy, [email protected]) 
     - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) 


- object (class scala.collection.immutable.$colon$colon, List(_tmp[0] AS `item`, _tmp[1] AS `SR`, _tmp[2] AS``RP 
    ' 

我怎樣才能解決這個問題?

+0

你能添加更多的代碼嗎?可能有幾個問題。 –

+0

@AndreiT。代碼已更新。請確認一次。 –

+0

嘗試用'@ transient'標記'exprs'變量。 –

回答

0

儘量放在類定義with Serializable

0

「org.apache.spark.SparkException:任務不序列化」是如此明顯,你的類不能被序列化。由於Spark是並行計算,它需要在整個集羣中共享代碼(閉包)。爲此,它需要你序列化你的代碼。用Serializable擴展您的班級定義。

+0

對不起。我是初學者,我完全不明白。你如何專門改變它? –

相關問題