2015-12-18 23 views
1

當使用spark-avro庫加入從avro文件創建的數據框時,我遇到了性能問題。加入在Spark上創建物理計劃中的CartesianProduct的數據框1.5.2

數據幀是從120K的avro文件創建的,總大小約爲1.5TB。 這兩個數據幀非常龐大,數十億條記錄。

這兩個DataFrame的連接永遠運行。 此過程在具有300個執行器並具有4個執行器內核和8GB內存的紗線羣上運行。

有關此加入的任何見解都將有所幫助。我已經在下面發佈瞭解釋計劃。 我注意到物理計劃中的CartesianProduct。我想知道這是否會導致性能問題。

以下是邏輯計劃和實際計劃。 (由於保密性質,我無法在此發佈任何列名或文件名)

== Optimized Logical Plan == 
Limit 21 
Join Inner, [ Join Conditions ] 
    Join Inner, [ Join Conditions ] 
    Project [ List of columns ] 
    Relation [ List of columns ] AvroRelation[ fileName1 ] -- large file - .5 billion records 
    InMemoryRelation [List of columns ], true, 10000, StorageLevel(true, true, false, true, 1), (Repartition 1, false), None 
    Project [ List of Columns ] 
    Relation[ List of Columns] AvroRelation[ filename2 ] -- another large file - 800 million records 

== Physical Plan == 
Limit 21 
Filter (filter conditions) 
    CartesianProduct 
    Filter (more filter conditions) 
    CartesianProduct 
    Project (selecting a few columns and applying a UDF to one column) 
     Scan AvroRelation[avro file][ columns in Avro File ] 
    InMemoryColumnarTableScan [List of columns ], true, 10000, StorageLevel(true, true, false, true, 1), (Repartition 1, false), None) 
    Project [ List of Columns ] 
    Scan AvroRelation[Avro File][List of Columns] 

Code Generation: true 

代碼如下所示。

val customerDateFormat = new SimpleDateFormat(「yyyy/MM/dd」);

val dates = new RetailDates() 
val dataStructures = new DataStructures() 

// Reading CSV Format input files -- retailDates 
// This DF has 75 records 
val retailDatesWithSchema = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("delimiter", ",") 
    .schema(dates.retailDatesSchema) 
    .load(datesFile) 
    .coalesce(1) 
    .cache() 

// Create UDF to convert String to Date 
val dateUDF: (String => java.sql.Date) = (dateString: String) => new java.sql.Date(customerDateFormat.parse(dateString).getTime()) 
val stringToDateUDF = udf(dateUDF) 

// Reading Avro Format Input Files 
// This DF has 500 million records 
val userInputDf = sqlContext.read.avro(「customerLocation") 
val userDf = userInputDf.withColumn("CAL_DT", stringToDateUDF(col("CAL_DT"))).select(
         "CAL_DT","USER_ID","USER_CNTRY_ID" 
        ) 

val userDimDf = sqlContext.read.avro(userDimFiles).select("USER_ID","USER_CNTRY_ID","PRIMARY_USER_ID") // This DF has 800 million records 

val retailDatesWithSchemaBroadcast = sc.broadcast(retailDatesWithSchema) 
val userDimDfBroadcast = sc.broadcast(userDimDf) 

val userAndRetailDates = userDnaSdDf 
    .join((retailDatesWithSchemaBroadcast.value).as("retailDates"), 
    userDf("CAL_DT") between($"retailDates.WEEK_BEGIN_DATE", $"retailDates.WEEK_END_DATE") 
    , "inner") 



val userAndRetailDatesAndUserDim = userAndRetailDates 
    .join((userDimDfBroadcast.value) 
    .withColumnRenamed("USER_ID", "USER_DIM_USER_ID") 
    .withColumnRenamed("USER_CNTRY_ID","USER_DIM_COUNTRY_ID") 
    .as("userdim") 
    , userAndRetailDates("USER_ID") <=> $"userdim.USER_DIM_USER_ID" 
     && userAndRetailDates("USER_CNTRY_ID") <=> $"userdim.USER_DIM_COUNTRY_ID" 
    , "inner") 

userAndRetailDatesAndUserDim.show() 

謝謝, Prasad。

回答

0

這裏沒什麼可繼續的(即使你的數據或者甚至是列/表名都是機密的,看到一些代碼可能會顯示你正在嘗試實現的內容也是有用的),但是CartesianProduct肯定是一個問題。 O(N^2)是你在大型數據集上真正想要避免的東西,在這種特殊情況下,它會觸發Spark中的所有弱點。

一般來說如果加入膨脹以顯式的笛卡爾乘積或等效操作這意味着表達不是基於平等該連接,因此不能使用基於加入洗牌(或廣播+散列)(SortMergeJoinHashJoin)進行優化。

編輯

在你的情況下的條件是最有可能的問題:

userDf("CAL_DT") between(
    $"retailDates.WEEK_BEGIN_DATE", $"retailDates.WEEK_END_DATE") 

這將是更好的計算例如WEEK_BEGIN_DATEuserDf,直接加入

$"userDf.WEEK_BEGIN_DATE" === $"retailDates.WEEK_BEGIN_DATE" 

另一個小改進是解析日期而不使用UDF例如e與unix_timestamp函數。

編輯

這裏還有一個問題,指出了rchukh<=>在星火< = 1.6擴展到笛卡爾積 - SPARK-11111

相關問題