2016-04-19 42 views
2

我試圖學習使用Dataframes進行編程。使用下面的代碼,我試圖在列上加入兩個CSV,然後將其另存爲組合的CSV。在SCALA IDE中運行該代碼,我看到了近200個小部分文件。你能幫我明白了什麼錯誤這裏 - 使用加入後創建大量零件文件後保存Dataframe

import org.apache.spark.SparkContext 

object JoinData { 
    def main(args: Array[String]) { 
    val sc = new SparkContext(args(0), "Csv Joining example")  
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    val df1 = sqlContext.load("com.databricks.spark.csv", Map("path" -> args(1), "header" -> "true")) 
    val df2 = sqlContext.load("com.databricks.spark.csv", Map("path" -> args(2), "header" -> "true")) 
    import org.apache.spark.sql.functions._ 
    val df_join = df1.join(df2, df1("Dept") === df2("dept"), "inner") 
    df_join.repartition(1) //This is also not helping 
    //Below line is generating 200 part files in Output_join folder 
    df_join.save("Output_join","com.databricks.spark.csv", org.apache.spark.sql.SaveMode.Overwrite) 

    } 
} 

程序參數 - 本地的src /主/資源/ emp.csv的src /主/資源/ dept.csv

正在使用的CSV數據

empId,empName,Dept,salution 
111,ABC,sales,mr 
112,ABC,it,mr 
113,ABC,tech,mr 
114,ABC,sales,mr 
115,ABC,sales,mr 
116,ABC,it,mr 
117,ABC,tech,mr 

dept,name 
sales,Sales of Cap 
it,Internal Training 
tech,Tech staff 
support,support services 

控制檯輸出

[Stage 4:>              (2 + 1)/200] 
[Stage 4:=>              (4 + 1)/200] 
[Stage 4:=>              (6 + 1)/200] 
[Stage 4:==>              (8 + 1)/200] 
[Stage 4:===>             (11 + 1)/200] 
[Stage 4:===>             (14 + 1)/200] 
[Stage 4:====>             (17 + 1)/200] 
[Stage 4:=====>             (19 + 1)/200] 
[Stage 4:=====>             (21 + 1)/200] 
[Stage 4:======>             (24 + 1)/200] 
[Stage 4:=======>            (26 + 1)/200] 
[Stage 4:=======>            (28 + 1)/200] 
[Stage 4:========>            (30 + 1)/200] 
[Stage 4:========>            (32 + 1)/200] 
[Stage 4:=========>            (34 + 1)/200] 
[Stage 4:==========>            (37 + 1)/200] 
[Stage 4:===========>           (40 + 1)/200] 
[Stage 4:============>           (43 + 1)/200] 
[Stage 4:============>           (46 + 1)/200] 
[Stage 4:=============>           (49 + 1)/200] 
[Stage 4:==============>           (52 + 1)/200] 
[Stage 4:===============>          (55 + 1)/200] 
[Stage 4:================>          (58 + 1)/200] 
[Stage 4:=================>          (61 + 1)/200] 
[Stage 4:=================>          (64 + 1)/200] 
[Stage 4:==================>          (67 + 1)/200] 
[Stage 4:===================>         (69 + 1)/200] 
[Stage 4:====================>         (72 + 1)/200] 
[Stage 4:=====================>         (75 + 1)/200] 
[Stage 4:=====================>         (78 + 1)/200] 
[Stage 4:======================>         (81 + 1)/200] 
[Stage 4:=======================>        (84 + 1)/200] 
[Stage 4:========================>        (87 + 1)/200] 
[Stage 4:=========================>        (90 + 1)/200] 
[Stage 4:=========================>        (92 + 1)/200] 
[Stage 4:==========================>        (95 + 1)/200] 
[Stage 4:===========================>       (98 + 1)/200] 
[Stage 4:===========================>       (101 + 1)/200] 
[Stage 4:============================>       (104 + 1)/200] 
[Stage 4:=============================>       (107 + 1)/200] 
[Stage 4:==============================>      (110 + 1)/200] 
[Stage 4:===============================>      (113 + 1)/200] 
[Stage 4:===============================>      (116 + 1)/200] 
[Stage 4:================================>      (119 + 1)/200] 
[Stage 4:=================================>      (122 + 1)/200] 
[Stage 4:=================================>      (123 + 1)/200] 
[Stage 4:==================================>     (126 + 1)/200] 
[Stage 4:===================================>     (129 + 1)/200] 
[Stage 4:====================================>     (132 + 1)/200] 
[Stage 4:=====================================>     (135 + 1)/200] 
[Stage 4:=====================================>     (138 + 1)/200] 
[Stage 4:======================================>    (140 + 1)/200] 
[Stage 4:======================================>    (141 + 1)/200] 
[Stage 4:=======================================>    (144 + 1)/200] 
[Stage 4:========================================>    (148 + 1)/200] 
[Stage 4:=========================================>    (151 + 1)/200] 
[Stage 4:==========================================>   (154 + 1)/200] 
[Stage 4:==========================================>   (156 + 2)/200] 
[Stage 4:===========================================>   (159 + 1)/200] 
[Stage 4:============================================>   (161 + 1)/200] 
[Stage 4:============================================>   (162 + 1)/200] 
[Stage 4:=============================================>   (164 + 1)/200] 
[Stage 4:=============================================>   (165 + 1)/200] 
[Stage 4:==============================================>  (168 + 1)/200] 
[Stage 4:===============================================>  (171 + 1)/200] 
[Stage 4:===============================================>  (174 + 1)/200] 
[Stage 4:================================================>  (177 + 1)/200] 
[Stage 4:=================================================>  (180 + 1)/200] 
[Stage 4:==================================================> (183 + 1)/200] 
[Stage 4:===================================================> (186 + 1)/200] 
[Stage 4:===================================================> (189 + 1)/200] 
[Stage 4:=====================================================> (193 + 1)/200] 
[Stage 4:=====================================================> (196 + 1)/200] 
[Stage 4:======================================================>(199 + 1)/200] 

回答

0

多個文件的原因是因爲計算是分佈式的。簡而言之,輸出文件的數量等於您的Dataframe/RDD中的分區數量。您可以調用Dataframe/RDD上的重新分區或合併來更改分區數。 '重新分區'將洗牌原始分區,然後重新分區,而'coalesce'只會將原始分區結合到新的分區數量。

val resultDF = df_join.coalesce(10) 

配置根據您的規格內COALESCE值。我用coalesce而不是重新分區是因爲洗牌(由重新分區完成)可能非常昂貴。如果你只是想減少分區的數量,合併將是最好的方式。

+0

感謝您的答覆。 Dataframe中不允許使用df_join.coalesce(10)。重新分配沒有幫助。 Spark自動生成200個分區 –

+0

您使用的是什麼版本的火花? – dheee