當使用spark-avro庫加入從avro文件創建的數據框時,我遇到了性能問題。加入在Spark上創建物理計劃中的CartesianProduct的數據框1.5.2
數據幀是從120K的avro文件創建的,總大小約爲1.5TB。 這兩個數據幀非常龐大,數十億條記錄。
這兩個DataFrame的連接永遠運行。 此過程在具有300個執行器並具有4個執行器內核和8GB內存的紗線羣上運行。
有關此加入的任何見解都將有所幫助。我已經在下面發佈瞭解釋計劃。 我注意到物理計劃中的CartesianProduct。我想知道這是否會導致性能問題。
以下是邏輯計劃和實際計劃。 (由於保密性質,我無法在此發佈任何列名或文件名)
== Optimized Logical Plan ==
Limit 21
Join Inner, [ Join Conditions ]
Join Inner, [ Join Conditions ]
Project [ List of columns ]
Relation [ List of columns ] AvroRelation[ fileName1 ] -- large file - .5 billion records
InMemoryRelation [List of columns ], true, 10000, StorageLevel(true, true, false, true, 1), (Repartition 1, false), None
Project [ List of Columns ]
Relation[ List of Columns] AvroRelation[ filename2 ] -- another large file - 800 million records
== Physical Plan ==
Limit 21
Filter (filter conditions)
CartesianProduct
Filter (more filter conditions)
CartesianProduct
Project (selecting a few columns and applying a UDF to one column)
Scan AvroRelation[avro file][ columns in Avro File ]
InMemoryColumnarTableScan [List of columns ], true, 10000, StorageLevel(true, true, false, true, 1), (Repartition 1, false), None)
Project [ List of Columns ]
Scan AvroRelation[Avro File][List of Columns]
Code Generation: true
代碼如下所示。
val customerDateFormat = new SimpleDateFormat(「yyyy/MM/dd」);
val dates = new RetailDates()
val dataStructures = new DataStructures()
// Reading CSV Format input files -- retailDates
// This DF has 75 records
val retailDatesWithSchema = sqlContext.read
.format("com.databricks.spark.csv")
.option("delimiter", ",")
.schema(dates.retailDatesSchema)
.load(datesFile)
.coalesce(1)
.cache()
// Create UDF to convert String to Date
val dateUDF: (String => java.sql.Date) = (dateString: String) => new java.sql.Date(customerDateFormat.parse(dateString).getTime())
val stringToDateUDF = udf(dateUDF)
// Reading Avro Format Input Files
// This DF has 500 million records
val userInputDf = sqlContext.read.avro(「customerLocation")
val userDf = userInputDf.withColumn("CAL_DT", stringToDateUDF(col("CAL_DT"))).select(
"CAL_DT","USER_ID","USER_CNTRY_ID"
)
val userDimDf = sqlContext.read.avro(userDimFiles).select("USER_ID","USER_CNTRY_ID","PRIMARY_USER_ID") // This DF has 800 million records
val retailDatesWithSchemaBroadcast = sc.broadcast(retailDatesWithSchema)
val userDimDfBroadcast = sc.broadcast(userDimDf)
val userAndRetailDates = userDnaSdDf
.join((retailDatesWithSchemaBroadcast.value).as("retailDates"),
userDf("CAL_DT") between($"retailDates.WEEK_BEGIN_DATE", $"retailDates.WEEK_END_DATE")
, "inner")
val userAndRetailDatesAndUserDim = userAndRetailDates
.join((userDimDfBroadcast.value)
.withColumnRenamed("USER_ID", "USER_DIM_USER_ID")
.withColumnRenamed("USER_CNTRY_ID","USER_DIM_COUNTRY_ID")
.as("userdim")
, userAndRetailDates("USER_ID") <=> $"userdim.USER_DIM_USER_ID"
&& userAndRetailDates("USER_CNTRY_ID") <=> $"userdim.USER_DIM_COUNTRY_ID"
, "inner")
userAndRetailDatesAndUserDim.show()
謝謝, Prasad。