2017-03-07 125 views
0

考慮代碼:怪異的行爲

val df1 = spark.table("t1").filter(col("c1")=== lit(127)) 
val df2 = spark.sql("select x,y,z from ORCtable") 
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*), 
    trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter") 
df3.select($"y_R",$"z_R").show(500,false) 

這是生產的代碼失敗java.lang.OutOfMemoryError: GC overhead limit exceeded警告WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.

但是,如果我運行下面的代碼:

val df1 = spark.table("t1").filter(col("c1")=== lit(127)) 
val df2 = spark.sql("select x,y,z from ORCtable limit 2000000")//only difference here 
//ORC table has 1651343 rows so doesn't exceed limit 2000000 
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*), 
    trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter") 
df3.select($"y_R",$"z_R").show(500,false) 

這將產生正確的輸出。我不知道爲什麼會發生這種情況以及發生了什麼變化。有人可以幫助理解這一點嗎?

回答

2

回答我自己的問題:火花physical execution plan是不同的兩種方式產生相同的dataframe可以通過調用.explain()方法來檢查。

第一種方法使用broadcast-hash join,這導致java.lang.OutOfMemoryError: GC overhead limit exceeded,而後一種方式運行sort-merge join,通常速度較慢,但​​不會對垃圾收集造成太大影響。

物理執行計劃的這種差異由df2 dataframe上的附加filter操作引入。