2016-09-21 91 views
3

假設我們有一個健康的羣集和使用情況下,我們有性能加入火花SQL

1個Billlion +記錄

兩個數據集我們需要將這兩個數據集比較和找出

重複的原始數據集

我正打算與那些對被檢查 列加入到寫

SQL查詢複製

我想知道如何將是

性能此查詢以及也可以在數據集中完成的改進

(數據幀分區),然後加入它們。

請確認您的看法。

+0

都是每個10億條記錄的數據集? – avrsanjay

+0

是的VRSA。兩者都是10億加 –

+0

「重複」是指數據集1中的數據集2中還存在元組嗎? –

回答

1

無法預測此類訂單的數據集的查詢性能,但可以進行處理。我使用了7億條記錄的數據集,下面是幫助調整我的應用程序的重要屬性。

  • spark.sql.shuffle.partitions(找到你自己的甜蜜點)
  • spark.serializer(最好KryoSerializer)

也爲您的應用程序分配的集羣資源是相當重要的。請參閱blog。謝謝。

+0

感謝VRSA。請你也可以建議我,我怎樣才能分配我的數據集加入前,因爲我的連接條件是動態的。 –

3

我想知道如何將性能

相比呢?至於絕對數字,我認爲這顯然取決於你的數據和你的集羣。

但是在Spark 2.0中,性能改進非常顯着。

和精煉

的催化劑優化是相當不錯的(更所以2.0之後)。下面需要的大部分的優化,例如列修剪護理,謂語下推等(在2.0也有代碼生成這需要產生一個非常優化的代碼,實現了非常大的性能改進的服務。)

而這些同樣的改進無論您是使用DataFrames/Datasets API還是SQL,都可以全面使用。

作爲Spark的催化劑所做的一種查詢優化的例子,假設您有兩個具有相同模式(與您的情況相同)的數據框df1和df2,並且您希望在某些列上加入它們以僅獲得交集並輸出那些元組。

比方說,我的數據幀模式是如下(主叫df.schema):

StructType(
StructField(df.id,StringType,true), 
StructField(df.age,StringType,true), 
StructField(df.city,StringType,true), 
StructField(df.name,StringType,true)) 

這是我們有ID,年齡,城市,名字列在我的數據集。

現在給你想做的事情。如果你看看上面你實際規劃會注意到通過催化劑優化的引擎蓋下做了很多的優化,你會做這樣的事情

df1.join(
    df2, 
    $"df2.name"===$"df1.name" 
     ).select("df1.id","df1.name", "df1.age", "df1.city").show 

什麼:

== Physical Plan == 
*Project [df1.id#898, df1.name#904, df1.age#886, df1.city#892] 
+- *BroadcastHashJoin [df1.name#904], [df2.name#880], Inner, BuildRight 
    :- *Project [age#96 AS df1.age#886, city#97 AS df1.city#892, id#98 AS df1.id#898, name#99 AS df1.name#904] 
    : +- *Filter isnotnull(name#99) 
    :  +- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string> 
    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) 
     +- *Project [name#99 AS df2.name#880] 
     +- *Filter isnotnull(name#99) 
      +- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string> 

`

特別注意,即使兩個相同dataframes被加入他們正在讀不同 -

  1. 謂詞下推:從查詢很明顯星火對於DF2所有你需要的是name列(而不是與id, age等整條記錄)。如果將這些信息推送到正在讀取數據的位置,這不是很好嗎?這將使我無法讀取我不打算使用的不必要數據。這正是Spark所做的!加入Spark的一面將只讀name列。此行: +- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string>但是,對於另一方df1,我們希望在連接後的結果中包含所有四列。 Spark再次指出了這一點,並且爲此它讀取了所有四列。此行:+- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>

  2. 還剛剛閱讀和加入之前Spark發現你加入name列。所以在加入之前,它刪除了名稱爲null的元組。此行:+- *Filter isnotnull(name#99)

這意味着Spark已經爲您做了所有這些繁重的工作,以便讀取最小數據並將其存入內存(從而減少混洗和計算時間)。

但是,對於您的特定情況,您可能想要考慮是否可以進一步減少讀取的數據 - 至少在連接的一側。如果在df2中有很多行具有相同的鍵組合,那麼您將與df1匹配的組合如何。首先在df2上做一個明確的分析,你會不會變得更好?即喜歡的東西:

df1.join(
    df2.select("df2.name").distinct, 
    $"df2.name"===$"df1.name" 
    ).select("df1.id","df1.name", "df1.age", "df1.city") 
0

您是否嘗試過增加執行核心,4個或更多基於您的羣集配置,也同時做了火花提交更好更何況沒有。的執行者。讓火花決定不。的執行者被使用。在處理大量數據集時,這可能會在某種程度上提高性能。

+0

越來越多的執行者核心將進入優化的執行級別。現在我正在嘗試準備我的數據集,並根據列找到它們之間的重複項。 –