2016-08-21 57 views
3

通常我加載csv文件,然後運行不同類型的聚合,例如Spark的「group by」。我想知道是否可以在文件加載過程中啓動這種操作(通常是幾百萬行),而不是對它們進行序列化,以及它是否值得(節省時間)。Spark:加載過程中的分組

例子:

val csv = sc.textFile("file.csv") 
val data = csv.map(line => line.split(",").map(elem => elem.trim)) 
val header = data.take(1) 
val rows = data.filter(line => header(0) != "id") 
val trows = rows.map(row => (row(0), row)) 
trows.groupBy(//row(0) etc.) 

對於我的星火如何工作的理解,groupBy(或aggregate)將被「推遲」到整個CSV文件的內存加載。如果這是正確的,裝載和分組能否在「相同」時間運行,而不是按照兩個步驟排序?

回答

3

groupBy(或聚合)將被「推遲」到整個文件csv的內存中的加載。

情況並非如此。在本地(單分區)級別,Spark使用惰性序列,因此屬於單個任務的操作(包括地圖側聚合)可以壓縮在一起。

換句話說,當你有一系列方法時,操作是逐行執行而不是通過轉換進行。換句話說,第一行將被映射,過濾,再次映射並在下一個訪問之前傳遞給聚合器。

+0

好的,謝謝。如果是這樣的話,我身邊的任何進一步優化都是無用的。 Spark的高級函數通常都是懶惰的或者有例外? – Randomize

+0

這些東西大部分不是Spark特定的。它只是用來實現內部邏輯的數據結構的一個屬性。但總的來說,我會說Spark在實踐中很有意義。 – zero323

1

要通過負荷運行你可以用2個選項進行啓動組:

  1. 寫自己的裝載機,並通過內部的+ aggregationByKey使自己組。這個缺點是寫更多的代碼&更多維護。
  2. 使用鑲木格式文件作爲輸入+ DataFrames,由於它的柱狀它會讀取你的GROUPBY只使用所需的列。所以它應該更快。 - DataFrameReader

    df = spark.read.parquet('file_path') 
    df = df.groupBy('column_a', 'column_b', '...').count() 
    df.show() 
    

由於Spark是懶惰的,直到調用行動的方法,如顯示它不會加載您的文件/收集/寫入。所以Spark會知道哪些列讀取,哪些忽略了加載過程。