2017-03-05 45 views
3

我想用SBT運行Scala示例從MongoDB讀取數據。每當我嘗試訪問從Mongo讀取到RDD中的數據時,我都會收到此錯誤。java.lang.ClassNotFoundException:運行Scala MongoDB連接器時出現org.apache.spark.sql.DataFrame錯誤

Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: org/apache/spark/sql/DataFrame 
at java.lang.Class.getDeclaredMethods0(Native Method) 
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) 
at java.lang.Class.getDeclaredMethod(Class.java:2128) 
at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1431) 
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:72) 
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:494) 
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468) 
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 

我已經明確導入了數據框,即使它沒有在我的代碼中使用。任何人都可以解決這個問題嗎?

我的代碼:

package stream 

import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import com.mongodb.spark._ 
import com.mongodb.spark.config._ 
import com.mongodb.spark.rdd.MongoRDD 
import org.bson.Document 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.DataFrame 

object SpaceWalk { 

def main(args: Array[String]) { 

    val sparkConf = new SparkConf().setAppName("SpaceWalk") 
    .setMaster("local[*]") 
    .set("spark.mongodb.input.uri", "mongodb://127.0.0.1/nasa.eva") 
    .set("spark.mongodb.output.uri", "mongodb://127.0.0.1/nasa.astronautTotals") 

    val sc = new SparkContext(sparkConf) 
    val rdd = sc.loadFromMongoDB() 


    def breakoutCrew ( document: Document ): List[(String,Int)] = { 
    println("INPUT"+document.get("Duration").asInstanceOf[String]) 
     var minutes = 0; 
     val timeString = document.get("Duration").asInstanceOf[String] 
     if(timeString != null && !timeString.isEmpty) { 
     val time = document.get("Duration").asInstanceOf[String].split(":") 
     minutes = time(0).toInt * 60 + time(1).toInt 
     } 

     import scala.util.matching.Regex 
     val pattern = new Regex("(\\w+\\s\\w+)") 
     val names = pattern findAllIn document.get("Crew").asInstanceOf[String] 
     var tuples : List[(String,Int)] = List() 
     for (name <- names) { tuples = tuples :+ ((name, minutes)) } 

     return tuples 
    } 

    val logs = rdd.flatMap(breakoutCrew).reduceByKey((m1: Int, m2: Int) => (m1 + m2)) 

    //logs.foreach(println) 

    def mapToDocument(tuple: (String, Int) ): Document = { 
     val doc = new Document(); 
     doc.put("name", tuple._1) 
     doc.put("minutes", tuple._2) 

     return doc 
    } 

    val writeConf = WriteConfig(sc) 
    val writeConfig = WriteConfig(Map("collection" -> "astronautTotals", "writeConcern.w" -> "majority", "db" -> "nasa"), Some(writeConf)) 

    logs.map(mapToDocument).saveToMongoDB(writeConfig) 

    import org.apache.spark.sql.SQLContext 
    import com.mongodb.spark.sql._ 
    import org.apache.spark.sql.DataFrame 

    // load the first dataframe "EVAs" 
    val sqlContext = new SQLContext(sc); 
    import sqlContext.implicits._ 
    val evadf = sqlContext.read.mongo() 
    evadf.printSchema() 
    evadf.registerTempTable("evas") 

    // load the 2nd dataframe "astronautTotals" 

    val astronautDF = sqlContext.read.option("collection", "astronautTotals").mongo[astronautTotal]() 
    astronautDF.printSchema() 
    astronautDF.registerTempTable("astronautTotals") 

    sqlContext.sql("SELECT astronautTotals.name, astronautTotals.minutes FROM astronautTotals" ).show() 


    sqlContext.sql("SELECT astronautTotals.name, astronautTotals.minutes, evas.Vehicle, evas.Duration FROM " + 
     "astronautTotals JOIN evas ON astronautTotals.name LIKE evas.Crew" ).show() 
} 
} 
case class astronautTotal (name: String, minutes: Integer) 

這是我的SBT文件 -

name := "Project" 
version := "1.0"  
scalaVersion := "2.11.7"  
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0" 
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.0.0" 
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0" 
//libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.2.1" 
libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.0" 
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "0.1" 

addCommandAlias("c1", "run-main stream.SaveTweets") 
addCommandAlias("c2", "run-main stream.SpaceWalk") 

outputStrategy := Some(StdoutOutput) 
//outputStrategy := Some(LoggedOutput(log: Logger))  
fork in run := true 

回答

5

此錯誤消息是因爲你使用的是僅支持星火1.x的不兼容庫您應該使用mongo-spark-connector 2.0.0+。參見:https://docs.mongodb.com/spark-connector/v2.0/

+0

謝謝。這工作。 –

+0

Thx。嘗試使用Spark Cassandra連接器時遇到同樣的問題,並且您的答案幫助解決了這一問題。 – arun

相關問題