2015-11-10 48 views
7

我想在包含avro文件的一些生成的S3路徑上運行spark工作(spark v1.5.1)。我正在加載它們:如何讓火花忽略缺少的輸入文件?

val avros = paths.map(p => sqlContext.read.avro(p)) 

雖然有些路徑不存在。我如何能夠忽略那些空洞的路徑?以前我使用過this answer,但我不確定如何在新的數據幀API中使用它。

注意:我理想的是尋找類似的方法來鏈接答案,只是使輸入路徑可選。我不是特別想要在S3中明確檢查路徑的存在(因爲這很麻煩並且可能會使開發變得笨拙),但是如果現在沒有乾淨的方法來實現這一點,我想這就是我的後備。

回答

9

我會使用scala Try類型來處理在讀取avro文件目錄時出現故障的可能性。使用'嘗試'我們可以使我們的代碼中顯式失敗的可能性,並以功能方式處理:

object Main extends App { 

    import scala.util.{Success, Try} 
    import org.apache.spark.{SparkConf, SparkContext} 
    import com.databricks.spark.avro._ 

    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("example")) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

    //the first path exists, the second one doesn't 
    val paths = List("/data/1", "/data/2") 

    //Wrap the attempt to read the paths in a Try, then use collect to filter 
    //and map with a single partial function. 
    val avros = 
    paths 
     .map(p => Try(sqlContext.read.avro(p))) 
     .collect{ 
     case Success(df) => df 
     } 
    //Do whatever you want with your list of dataframes 
    avros.foreach{ df => 
    println(df.collect()) 
    } 
    sc.stop() 
} 
+0

從spark文檔:'collect()'可能導致驅動程序內存不足,儘管如此,因爲collect()會將整個RDD提取到一臺機器上。有沒有不使用'collect()'的解決方案?這是一個非常大的數據集。 – jbrown

+2

當在RDD上調用collect()時,情況就是如此。在我第一次調用'collect(...)'的地方,包含一個部分函數,​​它在一個RDD列表上,它是List上的collect函數,而不是任何RDD。這相當於做一個'map'和'filter'。我最後在'foreach'的末尾再次使用'collect()',但這只是操作RDD列表的一個例子,我不認爲這是你在自己的應用程序中會做的事情,但是我會需要一個簡單的結尾來看看這種方法是否正常工作。 – mattinbits

+0

哦,好的。我會試一試,看看它是否有效。我認爲第一個「collect」正在評估RDD並將所有數據發送到驅動程序節點。 – jbrown