2015-09-24 37 views
3

這樣做的目的是爲了在HDFS中的第二個位置操作和保存每個數據文件的副本。我將使用如何列出Spark Scala shell中HDFS位置中的所有csv文件?

RddName.coalesce(1).saveAsTextFile(pathName) 

將結果保存到HDFS。

這就是爲什麼我想要做的每個文件分開,即使我相信性能不會被視爲有效。但是,我還沒有確定如何將CSV文件路徑列表存儲到字符串數組中,然後使用單獨的RDD循環遍歷每個文件路徑。

讓我們用下面的匿名例子作爲HDFS源位置:

/data/email/click/date=2015-01-01/sent_20150101.csv 
/data/email/click/date=2015-01-02/sent_20150102.csv 
/data/email/click/date=2015-01-03/sent_20150103.csv 

我知道如何使用Hadoop FS殼牌列出文件路徑:

HDFS DFS -ls /data/email/click/*/*.csv 

我知道如何創建一個RDD的所有數據:

val sentRdd = sc.textFile("/data/email/click/*/*.csv") 

回答

3

我還沒有徹底測試過,但是像這樣ems工作:

import org.apache.spark.deploy.SparkHadoopUtil 
import org.apache.hadoop.fs.{FileSystem, Path, LocatedFileStatus, RemoteIterator} 
import java.net.URI 

val path: String = ??? 

val hconf = SparkHadoopUtil.get.newConfiguration(sc.getConf) 
val hdfs = FileSystem.get(hconf) 
val iter = hdfs.listFiles(new Path(path), false) 

def listFiles(iter: RemoteIterator[LocatedFileStatus]) = { 
    def go(iter: RemoteIterator[LocatedFileStatus], acc: List[URI]): List[URI] = { 
    if (iter.hasNext) { 
     val uri = iter.next.getPath.toUri 
     go(iter, uri :: acc) 
    } else { 
     acc 
    } 
    } 
    go(iter, List.empty[java.net.URI]) 
} 

listFiles(iter).filter(_.toString.endsWith(".csv")) 
+0

你必須使用URI的原因嗎?我可以只使用Path,返回結果是List [Path] –

+0

@MinnieShi它看不到任何原因,你不能 – zero323

0

sc.wholeTextFiles(path)should help。它提供了一個(filepath,filecontent)的rdd。

+1

雖然不會使用數據嗎?我只想遍歷每個文件路徑。 – Jaime

1

這是最終爲我工作:

import org.apache.hadoop.fs._ 
import org.apache.spark.deploy.SparkHadoopUtil 
import java.net.URI 

val hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf) 
val hdfs = FileSystem.get(hdfs_conf) 
// source data in HDFS 
val sourcePath = new Path("/<source_location>/<filename_pattern>") 

hdfs.globStatus(sourcePath).foreach{ fileStatus => 
    val filePathName = fileStatus.getPath().toString() 
    val fileName = fileStatus.getPath().getName() 

    // < DO STUFF HERE> 

} // end foreach loop 
相關問題