2016-05-06 33 views
1

我想對從mongodb獲取的數據運行Spark RowSimilarity推薦器。爲此,我寫了下面的代碼,它從mongo獲取輸入,將其轉換爲對象的RDD。這需要傳遞到IndexedDataSetSpark,然後傳遞給SimilarityAnalysis.rowSimilarityIDSScala - 創建IndexedDatasetSpark對象

import org.apache.hadoop.conf.Configuration 
import org.apache.mahout.math.cf.SimilarityAnalysis 
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark 
import org.apache.spark.rdd.{NewHadoopRDD, RDD} 
import org.apache.spark.{SparkConf, SparkContext} 
import org.bson.BSONObject 
import com.mongodb.hadoop.MongoInputFormat 

object SparkExample extends App { 
    val mongoConfig = new Configuration() 
    mongoConfig.set("mongo.input.uri", "mongodb://my_mongo_ip:27017/db.collection") 

    val sparkConf = new SparkConf() 
    val sc = new SparkContext("local", "SparkExample", sparkConf) 

    val documents: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD(
    mongoConfig, 
    classOf[MongoInputFormat], 
    classOf[Object], 
    classOf[BSONObject] 
) 
    val new_doc: RDD[(String, String)] = documents.map(
    doc1 => (
    doc1._2.get("product_id").toString(), 
    doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-")).mkString(" ") 
    ) 
) 
    var myIDs = IndexedDatasetSpark(new_doc)(sc) 

    SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://myhadoop:9000/myfile", readWriteSchema) 

我無法創造出可以傳遞給SimilarityAnalysis.rowSimilarityIDS一個IndexedDatasetSpark。請幫助我解決這個問題。

EDIT1:

我設法創建IndexedDatasetSpark對象和代碼現在編譯正確。我不得不作爲一個隱含參數添加(sc)IndexedDatasetSpark的代碼運行:

Error: could not find implicit value for parameter sc: org.apache.spark.SparkContext 

現在,當我運行它,它提供了以下錯誤:

Error: could not find implicit value for parameter sc: org.apache.mahout.math.drm.DistributedContext 

我無法弄清楚如何讓DistributedContext。

這是創建RDD並將其轉換爲IDS以便通過rowSimilarityIDS進行處理的正確方法嗎?

更多上下文:我已經從這種情況開始:Run Mahout RowSimilarity recommender on MongoDB data

我build.sbt:

name := "scala-mongo" 

version := "1.0" 

scalaVersion := "2.10.6" 

libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1" 

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1" 
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2" 

libraryDependencies ++= Seq(
    "org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"), 
    "org.scalatest" % "scalatest_2.10" % "1.9.2" % "test" 
) 

libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2" 
libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2" 
libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2" 
libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2" 

resolvers += "typesafe repo" at "http://repo.typesafe.com/typesafe/releases/" 

resolvers += Resolver.mavenLocal 

EDIT2:我暫時去掉dfsWrite讓代碼執行和偶然誤差以下:

java.io.NotSerializableException: org.apache.mahout.math.DenseVector 
Serialization stack: 
- object not serializable (class: org.apache.mahout.math.DenseVector, value: {3:1.0,8:1.0,10:1.0}) 
- field (class: scala.Some, name: x, type: class java.lang.Object) 
- object (class scala.Some, Some({3:1.0,8:1.0,10:1.0})) 
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

是否有一些序列化,我可能跳過了?

+0

你忘了顯示錯誤嗎? – pferrel

+0

@pferrel:我用最後一個錯誤編輯了問題。請讓我知道我是否遵循Scala/Spark/Mahout中正確的做法。 – user3295878

+0

@pferrel:刪除dfsWrite並讓rowSimilarity運行後,我遇到了一個新問題。已經更新了這個問題。 – user3295878

回答

0

我會放回你刪除的任何東西,第二個錯誤是自己造成的。

原來的錯誤是因爲你還沒有創建一個SparkContext,這是可以做到:

implicit val mc = mahoutSparkContext() 

此後我覺得MC(一SparkDistributedContext)到SC的隱式轉換(一SparkContext)會由包助手函數處理。如果sc仍然缺失,請嘗試:

implicit val sc = sdc2sc(mc) 
+0

謝謝@pferrel。我弄清楚了mahoutSparkContext。但我仍然必須明確地將(mc)傳遞給兩個函數才能使其工作。我應該發佈最終代碼嗎? – user3295878

+0

這聽起來像你可以回答你自己的問題? – pferrel

+0

在查看此[鏈接](https://mahout.apache.org/users/environment/how-to-build-an-app.html)後,我確實設法讓代碼正常工作。我仍然不知道這是否正確。我應該將我的代碼發佈爲答案嗎? – user3295878