...org.apache.spark.SparkException:任務不序列化的錯誤
val cols: Seq[String] = Seq("item", "SR", "RP")
val vecToSeq = udf((v:org.apache.spark.ml.linalg.Vector) => v.toArray)
val exprs = cols.zipWithIndex.map{ case(c,i) => $"_tmp".getItem(i).alias(c)}
val DoubleDF = result5.select(vecToSeq($"vectorCol").alias("_tmp")).select(exprs:_*)
...(對不起,我已經包括了一切,我認爲這是相關的,因爲我不知道有多少代碼。我應該給的信息。)
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.mllib.recommendation.{Rating,ALS}
...
val als = new org.apache.spark.ml.recommendation.ALS().setRank(10).setMaxIter(10).setRegParam(0.01).setUserCol("user").setItemCol("item").setRatingCol("rating")
...
val itemFactors = model.itemFactors
val item = Popular.select($"item").map(line => line.getDouble(0)).take(10).map(_.toInt)
val popularFactored = itemFactors.where(item.map($"id" === _).reduce(_||_))
val exprs2 = (0 until 10).map(i => $"_tmp2".getItem(i).alias(s"z$i"))
val factored = popularFactored.select(($"features").alias("_tmp2")).select(exprs2:_*)
val AF = (0 until 10).map(i => factored.agg(avg(s"z$i")).first.getDouble(0)).toArray
val toFloat = udf((line: Seq[Float]) => line.map(_.toDouble))
val test = itemFactors.withColumn("testX",toFloat(itemFactors("features")))
val itemFactors2 = test.select($"id",$"testX")
val itemFeatures2 = itemFactors2.map{line => val feature = line.getAs[Seq[Double]]("testX")
val item = line.getAs[Int]("id")
(item,feature.toArray)}
val itemFeaturesR = itemFeatures2.rdd
val ItemFS= itemFeaturesR.map { case (id,factor) =>
val arr= new DoubleMatrix(10)
for (i <- 0 until 10){
val itemVector = new DoubleMatrix(AF)
val factorVector = factor(i)
arr.put(i,factorVector)
val sims = cosineSimilarity(arr.getRow(i), itemVector)
(id,sims)
}
}
我現在正在測量餘弦相似度。然而,當我運行上面的代碼,我得到一個錯誤「org.apache.spark.SparkException:任務不可序列
`org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2056)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
... 59 elided
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: _tmp[0] AS `item`)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, [email protected])
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(_tmp[0] AS `item`, _tmp[1] AS `SR`, _tmp[2] AS``RP
'
我怎樣才能解決這個問題?
你能添加更多的代碼嗎?可能有幾個問題。 –
@AndreiT。代碼已更新。請確認一次。 –
嘗試用'@ transient'標記'exprs'變量。 –