我嘗試編碼MLlib庫中包含的Streaming Kmeans算法。該算法開始很好,但是,當我把(帶命令的Hadoop FS -put ...)文件到HDFS系統(文件夾test),火花碰撞,出現以下錯誤:Streaming Kmeans中的錯誤,Spark
15/12/06 19:50:19 ERROR JobScheduler: Error generating jobs for time 1449427819000 ms
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://route_to_hdfs/test/data_2.txt.**_COPYING_**
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:321)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:264)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:385)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:115)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:273)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:263)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
...
更多細節:
data_2.txt
[0.0,0.0,0.0]
[0.1,0.1,0.1]
[0.2,0.2,0.2]
[9.0,9.0,9.0]
[9.1,9.1,9.1]
[9.2,9.2,9.2]
的代碼執行它是由mllib文檔所提供的一樣:http://spark.apache.org/docs/latest/mllib-guide.html
val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingKMeans")
val ssc = new StreamingContext(conf, Seconds(1))
//Streaming Kmeans
val training_dir = args(0)
val testData_dir = args(1)
val numDimensions= args(2).toInt
val numClusters = args(3).toInt
val trainingData = ssc.textFileStream(training_dir).map(Vectors.parse)
val testData = ssc.textFileStream(testData_dir).map(LabeledPoint.parse)
val model = new StreamingKMeans()
.setK(numClusters)
.setDecayFactor(1.0)
.setRandomCenters(numDimensions, 0.0)
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
你讀過錯誤信息嗎?它說你的文件不存在... – eliasah