我需要使用Apache Spark實現一個工作流的幫助。我的任務在下:通過Spark分別逐個處理多個文件
- 我有幾個CSV文件作爲源數據。注意:這些文件可能有不同的佈局
- 我有元數據信息我如何需要解析每個文件(這不是問題)
- 主要目標:結果是包含多個附加列的源文件。我必須更新每個源文件而不加入一個輸出範圍。例如:源10個文件 - > 10個結果文件,每個結果文件只有來自相應源文件的數據。
據我所知星火可以通過面具打開多個文件:
var source = sc.textFile("/source/data*.gz");
但在這種情況下,我不能識別文件的哪一行。如果我得到的源文件的列表,並嘗試過程以下情形:
JavaSparkContext sc = new JavaSparkContext(...);
List<String> files = new ArrayList() //list of source files full name's
for(String f : files)
{
JavaRDD<String> data = sc.textFile(f);
//process this file with Spark
outRdd.coalesce(1, true).saveAsTextFile(f + "_out");
}
但在這種情況下,我會處理在連續模式下的所有文件。
我的問題是下一個:我怎麼可以在並行模式下處理很多文件?例如:一個文件 - 一個執行者?
非常感謝您的幫助!
嗨Ramzy,感謝您的回答,但我有另一個查詢。方法'sparkcontext.wholeTextFiles(「/ path/to/folder/contained/all/files」)'打開並讀取內存中的文件。據我所知,大多數源文件將有大約1-3百萬行,但是多個文件的大小可能高達2-3 GB。這將工作沒有任何內存錯誤? – Yustas
當您使用sc.textFile或sc.wholeTextFiles時,計算尚未開始。只有當您執行處理開始的任何操作時,纔會基於數據集被劃分的默認分區。您可以通過您的RDD.partitions.length獲取分區數量並根據需要進行自定義,並且還可以獲取yourRDD.count()以獲取實際的RDD大小。 – Ramzy
@Ramzy,wholeTextFiles將創建具有文件整個上下文的路徑和值的鍵的RDD。如果某些文件是2-3GB,則會出現明顯的問題(取決於執行程序的內存,但在任何情況下,1個分區的GB都太多) –