2015-07-28 85 views
1

我最近開始使用Spark-Cassandra集羣(Master + 3 Workers)系統上的zeppelin來使用MLlib庫運行簡單的機器學習算法。Spark-Cassandra系統上的Zeppelin問題:Classnotfoundexception

這裏是我裝到飛艇庫:

%dep 
z.load("com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M1") 
z.load("org.apache.spark:spark-core_2.10:1.4.1") 
z.load("com.datastax.cassandra:cassandra-driver-core:2.1.3") 
z.load("org.apache.thrift:libthrift:0.9.2") 
z.load("org.apache.spark:spark-mllib_2.10:1.4.0") 
z.load("cassandra-clientutil-2.1.3.jar") 
z.load("joda-time-2.3.jar") 

我試圖實現對線性迴歸的腳本。但是,當我運行它,我得到了以下錯誤消息:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.xxx.xxx.xxx): java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 
at java.net.URLClassLoader$1.run(URLClassLoader.java:372) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:361) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:360) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
at java.lang.Class.forName0(Native Method) 
at java.lang.Class.forName(Class.java:344) 
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) 
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) 
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
... 

的問題是,該腳本不使用火花提交腳本的問題,這讓我感到困惑運行。

下面是一些代碼,我試圖執行:

import org.apache.spark.SparkContext 

import org.apache.spark.SparkConf 

import com.datastax.spark.connector._ 

import com.datastax.spark.connector.cql.CassandraConnector 

import org.apache.spark.mllib.regression.{LinearRegressionWithSGD, LinearRegressionModel, LabeledPoint} 

import org.apache.spark.rdd.RDD 



    sc.stop() 
    val conf = new SparkConf(true).set("spark.cassandra.connection.host", "xxx.xxx.xxx.xxx").setMaster("spark://xxx.xxx.xxx.xxx:7077").setAppName("DEMONSTRATION") 

    val sc = new SparkContext(conf) 

    case class Fact(numdoc:String, numl:String, year:String, creator:Double, date:Double, day:Double, user:Double, workingday:Double, total:String) 

    val data= sc.textFile("~/Input/Data.csv ») 

    val parsed = data.filter(!_.isEmpty).map {row => 
     val splitted = row.split(",") 
     val Array(nd, nl, yr)=splitted.slice(0,3) 
     val Array(cr, dt, wd, us, wod)=splitted.slice(3,8).map(_.toDouble) 
     Fact (nd, nl, yr, cr, dt, wd, us, wod, splitted(8)) 
    } 

    val class2id = parsed.map(_.total.toDouble).distinct.collect.zipWithIndex.map{case (k,v) => (k, v.toDouble)}.toMap 

    val id2class = class2id.map(_.swap) 

    val parsedData = parsed.map { i => LabaledPoint(class2id(i.total.toDouble), Array(i.creator,i.date,i.day,i.workingday)) 

    val model: LinearRegressionModel = LinearRegressionWithSGD.train(parsedData, 3) 

預先感謝您!

+0

你可以分享線性迴歸的腳本嗎?對我而言,看起來好像某些用戶代碼類未正確裝載到羣集中。 –

+0

嗨直到,當Spark嘗試將映射方法應用於數據集時,問題就開始了。我認爲工作人員無法加載額外的庫,所以我嘗試使用'addjar'和'setJars'方法並設置'SPARK_CLASSPATH'變量,但不幸的是這也不起作用。 – Med3

回答

0

我終於找到了解決方案! 事實上,我不應該在開始時停止SparkContext並創建一個新的。但是,在這種情況下,我無法訪問遠程計算機上的Cassandra,因爲默認情況下,zeppelin會使用安裝它的計算機的地址作爲Cassandra主機的地址。於是我在那裏安裝了一個新的Cassandra實例,並將其添加到我的初始集羣中,問題就解決了。