2017-07-19 57 views
0

我正嘗試閱讀scala中Paths的Sequence文件。下面是樣本(僞)代碼:Spark:只有在路徑存在的情況下才能讀取文件

val paths = Seq[String] //Seq of paths 
val dataframe = spark.read.parquet(paths: _*) 

現在,在上面的序列中,存在一些路徑,而有些則不存在。在閱讀parquet文件(避免org.apache.spark.sql.AnalysisException: Path does not exist)時,有什麼方法可以忽略丟失的路徑嗎?

我曾嘗試以下,似乎是工作,但後來,我結束了讀同一兩次路徑這是我想避免這樣做:

val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess) 

我檢查了options方法DataFrameReader但似乎沒有任何選項類似於ignore_if_missing

而且,這些路徑可以是hdfss3(這Seq被作爲方法參數傳遞)和在閱讀,我不知道一個路徑是否是s3hdfs所以不能用s3hdfs特定API來檢查存在。

回答

1

如何過濾paths firstly`:

paths.filter(f => new java.io.File(f).exists) 

例如:

Seq("/tmp", "xx").filter(f => new java.io.File(f).exists) 
// res18: List[String] = List(/tmp) 
+0

「路徑」可以是本地的'hdfs'路徑或's3'路徑。不確定'File.exists'是否適用於's3'。 –

+1

如果路徑是HDFS/S3路徑(通常與Spark一起使用),那麼需要稍微不同的API來檢查路徑存在。 [@DarshanMehta你擊敗了我3秒:)] –

+0

@TzachZohar哈哈是的。我現在已經更新了這個問題。 –

4

您可以過濾掉不相關的文件,如@ Psidom的答案。在火花中,最好的方法是使用內部火花hadoop配置。鑑於火花會話變量被稱爲「火花」,你可以這樣做:

import org.apache.hadoop.fs.FileSystem 
import org.apache.hadoop.fs.Path 

val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) 

def testDirExist(path: String): Boolean = { 
    val p = new Path(path) 
    hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory 
} 
val filteredPaths = paths.filter(p => testDirExists(p)) 
val dataframe = spark.read.parquet(filteredPaths: _*) 
+0

根據您的系統設置,您可能需要在get:FileSystem.get(new URI(「s3:// bucket」),spark.sparkContext.hadoopConfiguration)中指定文件系統位置。否則,它可能會創建一個HDFS文件系統和barf來檢查S3文件系統的路徑。 – Azuaron

0

也許這樣的事情可以爲你工作?

def read(path: Seq[String]): Try[DataFrame] = Try(spark.read.parquet(p)) 


read("somePath") match { 
    case Success(df) => df.show() 
    case Failure(_) => Unit 
} 
相關問題