2017-10-07 155 views
0

我想了解輸入.csv文件上的K均值聚類,它由56376行和兩列組成,第一列代表id,第二列代表一組字/此數據的示例如下:在使用Spark的集羣中的「java.lang.NullPointerException」

** 1。 1428951621做版必修來到米蘭2013年4月19日maynardmonday 16

  • 1429163429室溫windeerlust sehun hyungluhan yessehun做甚至版必修
    天今天**
  • 用於處理這種情況的Scala代碼數據看起來像這樣

    val inputData = sc.textFile("test.csv") 
    
    
        // this is a changable parameter for the number of clusters to use for kmeans 
        val numClusters = 4; 
        // number of iterations for the kmeans 
        val numIterations = 10; 
        // this is the size of the vectors to be created by Word2Vec this is tunable 
        val vectorSize = 600; 
    val filtereddata = inputData.filter(!_.isEmpty). 
               map(line=>line.split(",",-1)). 
               map(line=>(line(1),line(1).split(" ").filter(_.nonEmpty))) 
    
    
    
    val corpus = inputData.filter(!_.isEmpty). 
              map(line=>line.split(",",-1)). 
              map(line=>line(1).split(" ").toSeq) 
        val values:RDD[Seq[String]] = filtereddata.map(s=>s._2) 
        val keys = filtereddata.map(s=>s._1) 
    /*******************Word2Vec and normalisation*****************************/ 
        val w2vec = new Word2Vec().setVectorSize(vectorSize); 
        val model = w2vec.fit(corpus) 
        val outtest:RDD[Seq[Vector]]= values.map(x=>x.map(m=>try { 
          model.transform(m) 
          } catch { 
          case e: Exception => Vectors.zeros(vectorSize) 
          })) 
        val convertest = outtest.map(m=>m.map(x=>(x.toArray))) 
    
        val withkey = keys.zip(convertest) 
        val filterkey = withkey.filter(!_._2.isEmpty) 
    
        val keysfinal= filterkey.map(x=>x._1) 
        val valfinal= filterkey.map(x=>x._2) 
        // for each collections of vectors that is one tweet, add the vectors 
        val reducetest = valfinal.map(x=>x.reduce((a,b)=>a.zip(b).map(t=>t._1+t._2))) 
        val filtertest = reducetest.map(x=>x.map(m=>(m,x.length)).map(m=>m._1/m._2)) 
        val test = filtertest.map(x=>new DenseVector(x).asInstanceOf[Vector]) 
        val normalizer = new Normalizer() 
        val data1= test.map(x=>(normalizer.transform(x))) 
    /*********************Clustering Algorithm***********************************/ 
        val clusters = KMeans.train(data1,numClusters,numIterations) 
        val predictions= clusters.predict(data1) 
        val clustercount= keysfinal.zip(predictions).distinct.map(s=>(s._2,1)).reduceByKey(_+_) 
        val result= keysfinal.zip(predictions).distinct 
        result.saveAsTextFile(fileToSaveResults) 
        val wsse = clusters.computeCost(data1) 
        println(s"The number of clusters is $numClusters") 
        println("The cluster counts are:") 
        println(clustercount.collect().mkString(" ")) 
        println(s"The wsse is: $wsse") 
    

    但是經過一些迭代它在舞臺36.The錯誤拋出一個「顯示java.lang.NullPointerException」並退出看起來是這樣的:

    17/10/07 14:42:10 INFO TaskSchedulerImpl: Adding task set 26.0 with 2 tasks 
    17/10/07 14:42:10 INFO TaskSetManager: Starting task 0.0 in stage 26.0 (TID 50, localhost, partition 0, ANY, 5149 bytes) 
    17/10/07 14:42:10 INFO TaskSetManager: Starting task 1.0 in stage 26.0 (TID 51, localhost, partition 1, ANY, 5149 bytes) 
    17/10/07 14:42:10 INFO Executor: Running task 1.0 in stage 26.0 (TID 51) 
    17/10/07 14:42:10 INFO Executor: Running task 0.0 in stage 26.0 (TID 50) 
    17/10/07 14:42:10 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir 
    17/10/07 14:42:10 INFO deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class 
    17/10/07 14:42:10 INFO deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class 
    17/10/07 14:42:10 INFO deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir 
    17/10/07 14:42:10 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 
    17/10/07 14:42:10 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms 
    17/10/07 14:42:10 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 
    17/10/07 14:42:10 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms 
    17/10/07 14:42:10 ERROR Executor: Exception in task 0.0 in stage 26.0 (TID 50) 
    java.lang.NullPointerException 
        at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) 
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:404) 
    

    請幫助我在本代碼中本地化問題,因爲我無法理解。 注意:由他人編寫的代碼

    回答

    0

    我認爲這與你的代碼無關。如果傳遞給ProcessBuilder的參數之一是null,則會拋出此異常。所以我想這必須是Hadoop中的配置問題或錯誤。

    從快速谷歌搜索 「的Hadoop java.lang.ProcessBuilder.start NullPointerException異常」 看來這是一個已知的問題:

    https://www.fachschaft.informatik.tu-darmstadt.de/forum/viewtopic.php?t=34250

    Is it possible to run Hadoop jobs (like the WordCount sample) in the local mode on Windows without Cygwin?

    +0

    非常感謝Lexicore – Ram

    +0

    @Ram如果有幫助,請隨時接受答案。 – lexicore

    +0

    是的,第二個lexicore在這裏....這就是問題所在。 –