2016-03-18 64 views
1

我正在使用spark處理一個非常大的數據集。數據存儲爲avro文件。數據也被組織在一個目錄結構中(/ input/yyyy/MM/dd/HH /)。因此,例如今天的avro文件將位於/ input/2016/03/18/00至/ input/2016/03/18/23Spark創建100個空的avro文件

現在如果我處理最近2年的數據,許多avro文件被處理。

的數據處理代碼如下

val inputRDD = sc.load("/input", "com.databricks.spark.avro").rdd 
val outputRDD = inputRDD.map(foo).filter(_.isDefined).flatMap(x => x).join(anotherRDD).map { 
    case (a, (b, (c, d))) => (a, (b, c, d)) 
}.join(yetAnotherRDD).filter { 
    case (a, ((b, c, d), (e, f))) => Math.abs(a - b) <= 2000 
}.map { 
    case (a, ((b, c, d), (e, f))) => Row(a, d) 
} 
val outputDF = sc.createDataframe(outputRDD, outputSchema) 
outputDF.save(s"/output/${datePath(date)}", "com.databricks.spark.avro") 

現在,當我內部輸出去使用色相。我看到181頁,每頁上我都看到很多空的avro文件。

並非所有的文件都是空的......但有太多的空文件。

如果我不想要空文件,該怎麼辦? (不訴諸於「收集」)

回答

1

每個輸入文件至少會產生一個RDD(如果文件很大,我可能會在多個輸入序列中讀取並製作多個RDD)。 在你的應用程序中,你在這些RDD上執行過濾器,所以有可能相當多的RDD最終都是空的,因爲它們的所有行都被過濾掉了。保存DataFrame時,每個RDD都將保存到不同的PART文件中,因此一個空的RDD將生成一個空的RDD文件。 要解決此問題,請使用.coalesce(n),這將縮小RDD的數量。 因此,嘗試一下你的上線是這樣的:

outputDF.coalesce(200).save(s"/output/${datePath(date)}", "com.databricks.spark.avro") 

中聚結,用的數量是高度依賴於數據的大小。如果RDD太多,由於與驅動程序之間的通信開銷會導致很多性能損失;如果RDD太少,則可能不會使用所有可用的執行程序,這也會導致性能低於最佳性能。

+0

我檢查了,但spark 1.3.0沒有colease功能。我想他們稍後將它添加到數據框中。 –

+1

在這種情況下,您可以使用'.repartition(200)'。請注意,'.coalesce(200)'會在更高版本的Spark上執行得更好。 –