2014-06-23 101 views
59

在Spark中使用Scala時,無論何時使用saveAsTextFile轉儲結果,它似乎都將輸出拆分爲多個部分。我只是傳遞一個參數(路徑)。如何讓saveAsTextFile不將輸出分割成多個文件?

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) 
year.saveAsTextFile("year") 
  1. 是否輸出的數量對應於它使用減速器的數量?
  2. 這是否意味着輸出被壓縮?
  3. 我知道我可以將輸出結合在一起使用bash,但有沒有一個選項可以將輸出存儲在單個文本文件中,而不會分裂?我查看了API文檔,但沒有多說這個。
+1

如果文件很大,在大數據中只使用一個文件通常是不好的做法。 – samthebest

+0

如果輸出結果是排序文件,那麼最佳做法是什麼?把它保存爲一個文件集合,並使許多輸出文件名稱成爲某種索引(即第一個文件名稱爲「aa」,中間的文件名稱會像「fg」,最後一個是「zzy」)? – Rdesmond

+0

通常情況下,重火花工作只會產生非常小的輸出(彙總,kpis,popularities,...),這是在hdfs上產生的,但最有可能是後者在與大數據無關的應用程序中使用。在這種情況下,更清潔,更容易擁有一個有名的單一文件傳輸和消費。 –

回答

84

將其保存爲多個文件的原因是因爲計算是分佈式的。如果輸出是足夠小,你覺得你能適應這臺機器上,那麼你就可以

val arr = year.collect() 

結束你的程序,然後保存生成的陣列作爲一個文件,另一種方法是使用自定義分區程序,partitionBy,並且使它儘可能一切進入一個分區,儘管這不可取,因爲您不會得到任何並行化。

如果您需要將文件與saveAsTextFile一起保存,您可以使用coalesce(1,true).saveAsTextFile()。這基本上意味着做計算然後合併到1分區。您還可以使用repartition(1),它僅僅是​​3210的包裝,shuffle參數設置爲true。通過RDD.scala的來源查看是我如何計算出這些東西的大部分,你應該看一看。

+1

如何將文本文件保存爲數組?數組沒有saveAsTextFile函數。只是爲了RDD。 – user2773013

+2

@ user2773013那麼這種方法將會是'coalesce'或我提出的'partition'方法,但是如果只存儲在1個節點上,那麼在hdfs上存儲真的沒有意義,這就是爲什麼使用collect真的是正確的方法 – aaronman

+1

謝謝@aaronman !!! – user2773013

16

您可以撥打coalesce(1)然後saveAsTextFile() - 但如果您有大量數據,這可能是個壞主意。爲了讓單獨的映射器和簡化器寫入不同的文件,就像在Hadoop中一樣生成每個分割的單獨文件。如果你的數據非常少,那麼只有一個輸出文件是一個好主意,在這種情況下,你也可以使用collect(),就像@aaronman所說的那樣。

+0

尼斯沒有想到'coalesce'清潔比亂搞分區,周圍說,我仍然認爲,如果你的目標是把它收集到一個文件'collect'可能是正確的做法 – aaronman

+1

這個工作。但是,如果您使用聚結,那意味着您只使用1個縮減器。這不會減緩這個過程,因爲只使用1個減速器。 – user2773013

+1

是的,但這是你要求的。 Spark每個分區輸出一個文件。另一方面,你爲什麼關心文件的數量?當讀取Spark中的文件時,您可以指定父目錄,並將所有分區讀作一個RDD。 – David

2

您可以在下一個版本的Spark中使用它,在當前的版本1.0.0中,除非您以某種方式手動執行操作,例如像您提到的那樣使用bash腳本調用,否則無法執行此操作。

+0

感謝您的信息! – user2773013

+0

Spark的下一個版本在這裏,它不是很明顯該怎麼做:( –

1

我也想提一下,文檔中明確指出用戶在調用與真正的少量分區合併時應該小心。這可能會導致上游分區繼承此數量的分區。

我不會推薦使用coalesce(1),除非真的需要。

2

在Spark 1.6.1中,格式如下所示。它創建一個單一的輸出文件。如果輸出足夠小以便處理,最好使用它。基本上它所做的是返回一個新的RDD,該RDD被簡化爲numPartitions分區。如果正在進行劇烈聚合,例如到numPartitions = 1,這可能導致你的計算髮生在比你喜歡

pair_result.coalesce(1).saveAsTextFile("/app/data/") 
2

正如其他人所說,你可以收集或合併數據(例如,一個在numPartitions的情況下= 1點)較少的節點設置爲強制Spark生成單個文件。但是這也限制了可以並行處理數據集的Spark任務的數量。我更喜歡讓它在輸出HDFS目錄中創建一百個文件,然後使用hadoop fs -getmerge /hdfs/dir /local/file.txt將結果提取到本地文件系統中的單個文件中。當你的輸出是一個相對較小的報告時,這是最有意義的。

0

這是我輸出單個文件的答案。我只是說coalesce(1)

val year = sc.textFile("apat63_99.txt") 
       .map(_.split(",")(1)) 
       .flatMap(_.split(",")) 
       .map((_,1)) 
       .reduceByKey((_+_)).map(_.swap) 
year.saveAsTextFile("year") 

代碼:

year.coalesce(1).saveAsTextFile("year") 
1

您可以撥打repartition(),並按照這種方式:

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) 

var repartitioned = year.repartition(1) 
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00") 

enter image description here

1

對於那些更大的數據集工作並且仍願意從中獲利火花的平行性,rdd.coalesce(1).saveAsTextFile("path")不是解決方案。整個管道(從最後的火花動作到存儲)將在1個執行器上執行。

可以代替第一執行對執行者的任何數量的管道,並使用saveAsTextFile(這將產生輸出多個文件)然後只使用apache FileSystem API合併所有這些文件。

以下方法給出RDD存儲和路徑在哪裏存儲它:

import org.apache.spark.rdd.RDD 
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} 
import org.apache.hadoop.conf.Configuration 

def saveAsSingleTextFile(
    outputRDD: RDD[String], 
    outputFile: String 
): Unit = { 

    // Classic saveAsTextFile in a temporary folder: 
    outputRDD.saveAsTextFile(outputFile + ".tmp") 

    // The facility allowing file manipulations on hdfs: 
    val hdfs = FileSystem.get(new Configuration()) 

    // Merge the folder into a single file: 
    FileUtil.copyMerge(
    hdfs, 
    new Path(outputFile + ".tmp"), 
    hdfs, 
    new Path(outputFile), 
    true, 
    new Configuration(), 
    null) 

    // And we delete the intermediate folder: 
    hdfs.delete(new Path(outputFile + ".tmp"), true) 
} 

這樣的處理仍然分佈和合流部被之後完成的,這限制了在性能上的損失。

在獎金中,您可以提供輸出文件的確切名稱,與生成文件my/path/part-00000的rdd.coalesce(1).saveAsTextFile(「my/path」)相反。

相關問題