您可以使用此代碼。我用Spark 1.4和1.5測試了它。
它從inputSplit
獲取文件名,並將其添加到使用iterator
採用每行的NewHadoopRDD
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.{NewHadoopRDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
val sc = new SparkContext(new SparkConf().setMaster("local"))
val fc = classOf[TextInputFormat]
val kc = classOf[LongWritable]
val vc = classOf[Text]
val path :String = "file:///home/user/test"
val text = sc.newAPIHadoopFile(path, fc ,kc, vc, sc.hadoopConfiguration)
val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit((inputSplit, iterator) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(tup => (file.getPath, tup._2))
}
)
linesWithFileNames.foreach(println)
來源
2015-12-17 08:17:39
Udy
我不認爲有一種方式,如果你使用的代碼來獲取文件名的
mapPartitionsWithInputSplit
上面的代碼片段。但是,只能通過執行'sc.wholeTextFiles(「/ path/to/dir」).keys'來獲得文件名。但我不認爲你的錯誤是由使用wholeTextFile與textFile引起的 - 這是由你之後對數據所做的。你應該發佈你的其他代碼。 – KrisP我沒有任何其他代碼,只是整個文件文件和count() –
1.目錄中有多少文件; 2.你是否先在本地機器上嘗試你的代碼3.你如何運行spark – KrisP