2015-05-09 49 views
0

我正在使用ALS(Spark版本:1.3.1)來培訓推薦系統。現在我想通過交叉驗證使用Pipeline進行模型選擇。作爲第一步,我試着去適應the example code以及與此想出了:對於ALS,使用Spark MLlib管道不存在字段「項目」

val conf = new SparkConf().setAppName("ALS").setMaster("local") 
val sc = new SparkContext(conf) 
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._ 

val ratings: RDD[org.apache.spark.mllib.recommendation.Rating] = // ... 
val als = new ALS().setMaxIter(10).setRank(10).setRegParam(0.01) 
val pipeline = new Pipeline().setStages(Array(als)) 
val model = pipeline.fit(ratings.toDF) 

當我運行它,最後一行失敗,出現異常:

Exception in thread "main" java.lang.IllegalArgumentException: Field "item" does not exist. 
at org.apache.spark.sql.types.StructType$$anonfun$apply$25.apply(dataTypes.scala:1032) 
at org.apache.spark.sql.types.StructType$$anonfun$apply$25.apply(dataTypes.scala:1032) 
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) 
at scala.collection.AbstractMap.getOrElse(Map.scala:58) 
at org.apache.spark.sql.types.StructType.apply(dataTypes.scala:1031) 
at org.apache.spark.ml.recommendation.ALSParams$class.validateAndTransformSchema(ALS.scala:148) 
at org.apache.spark.ml.recommendation.ALS.validateAndTransformSchema(ALS.scala:229) 
at org.apache.spark.ml.recommendation.ALS.transformSchema(ALS.scala:304) 
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:142) 
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:142) 
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) 
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) 
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) 
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:142) 
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:58) 
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:100) 
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:79) 
at org.apache.spark.ml.Estimator.fit(Estimator.scala:44) 
... 

我不使用字符串"item"我的代碼中的任何地方,所以我認爲它是某種默認的。當我將.setItemCol("itemId")添加到als時,異常消息會相應更改。

"item"是什麼意思?我怎樣才能使管道工作?

回答

1

好的,解決方案其實很簡單:使用org.apache.spark.ml.recommendation.ALS.Rating而不是org.apache.spark.mllib.recommendation.Rating,它就會正常工作。

否則.setItemCol("product")會訣竅,因爲org.apache.spark.mllib.recommendation.Rating有一個名爲「product」的字段,而org.apache.spark.ml.recommendation.ALS.Rating調用相應的字段「item」。必須有一些魔法,在給定一個字符串的情況下,訪問一個case類的某個字段(反射?)。

+0

恕我直言,你不應該映射到'org.apache.spark.ml.recommendation.ALS.Rating'類,因爲你正在做雙重工作。 [code](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L366)內部映射'DataFrame '爲你評分'RDD :) :)。所以你要從評級到數據框回到評級。正如你發現的,似乎使用'setUserCol'和'setItemCol'是使用新的Spark ML API的方式。 – RAbraham