2015-12-16 119 views
4

我讀使用下面的代碼文件的目錄:如何用spark sc.textFile獲取文件名?

val data = sc.textFile("/mySource/dir1/*") 

現在我data RDD包含的所有文件中的所有行的目錄(右?)

我現在想添加列每行都帶有源文件名,我該怎麼做?

我試過的其他選項是使用wholeTextFile,但我不斷收到內存異常。 5臺服務器24核24 GB(executor-core 5 executor-memory 5G) 有什麼想法?

+0

我不認爲有一種方式,如果你使用的代碼來獲取文件名的mapPartitionsWithInputSplit上面的代碼片段。但是,只能通過執行'sc.wholeTextFiles(「/ path/to/dir」).keys'來獲得文件名。但我不認爲你的錯誤是由使用wholeTextFile與textFile引起的 - 這是由你之後對數據所做的。你應該發佈你的其他代碼。 – KrisP

+0

我沒有任何其他代碼,只是整個文件文件和count() –

+0

1.目錄中有多少文件; 2.你是否先在本地機器上嘗試你的代碼3.你如何運行spark – KrisP

回答

7

您可以使用此代碼。我用Spark 1.4和1.5測試了它。

它從inputSplit獲取文件名,並將其添加到使用iterator採用每行的NewHadoopRDD

import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat} 
import org.apache.spark.rdd.{NewHadoopRDD} 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.hadoop.io.LongWritable 
import org.apache.hadoop.io.Text 

val sc = new SparkContext(new SparkConf().setMaster("local")) 

val fc = classOf[TextInputFormat] 
val kc = classOf[LongWritable] 
val vc = classOf[Text] 

val path :String = "file:///home/user/test" 
val text = sc.newAPIHadoopFile(path, fc ,kc, vc, sc.hadoopConfiguration) 

val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]] 
      .mapPartitionsWithInputSplit((inputSplit, iterator) => { 
    val file = inputSplit.asInstanceOf[FileSplit] 
    iterator.map(tup => (file.getPath, tup._2)) 
    } 
) 

linesWithFileNames.foreach(println)