2015-12-06 445 views
-1

我嘗試編碼MLlib庫中包含的Streaming Kmeans算法。該算法開始很好,但是,當我把(帶命令的Hadoop FS -put ...)文件到HDFS系統(文件夾test),火花碰撞,出現以下錯誤:Streaming Kmeans中的錯誤,Spark

15/12/06 19:50:19 ERROR JobScheduler: Error generating jobs for time 1449427819000 ms 
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://route_to_hdfs/test/data_2.txt.**_COPYING_** 
     at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:321) 
     at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:264) 
     at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:385) 
     at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:115) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
     at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:273) 
     at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:263) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
     at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
     at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
... 

更多細節:

data_2.txt

[0.0,0.0,0.0] 
[0.1,0.1,0.1] 
[0.2,0.2,0.2] 
[9.0,9.0,9.0] 
[9.1,9.1,9.1] 
[9.2,9.2,9.2] 

的代碼執行它是由mllib文檔所提供的一樣:http://spark.apache.org/docs/latest/mllib-guide.html

val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingKMeans") 
    val ssc = new StreamingContext(conf, Seconds(1)) 

    //Streaming Kmeans 
    val training_dir = args(0) 
    val testData_dir = args(1) 
    val numDimensions= args(2).toInt 
    val numClusters = args(3).toInt 


    val trainingData = ssc.textFileStream(training_dir).map(Vectors.parse) 
    val testData = ssc.textFileStream(testData_dir).map(LabeledPoint.parse) 

    val model = new StreamingKMeans() 
     .setK(numClusters) 
     .setDecayFactor(1.0) 
     .setRandomCenters(numDimensions, 0.0) 

    model.trainOn(trainingData) 
    model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() 

    ssc.start() 
    ssc.awaitTermination() 
+1

你讀過錯誤信息嗎?它說你的文件不存在... – eliasah

回答

0

看起來Spark Streaming正在檢測正在複製到hdfs測試目錄的文件,如輸入文件名以**_COPYING_**結尾所示。然而,在documentation for Spark Streaming,他們的狀態:

The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.

所以,你應該用hdfs dfs -put data_2.txt /someotherhdfsdir/把文件到HDFS,然後hdfs dfs -mv /someotherhdfsdir/data_2.txt /trainingdir原子地將文件移動到由您的流媒體應用程序監控的HDFS目錄。