2016-03-12 34 views
1

我需要使用Apache Spark實現一個工作流的幫助。我的任務在下:通過Spark分別逐個處理多個文件

  1. 我有幾個CSV文件作爲源數據。注意:這些文件可能有不同的佈局
  2. 我有元數據信息我如何需要解析每個文件(這不是問題)
  3. 主要目標:結果是包含多個附加列的源文件。我必須更新每個源文件而不加入一個輸出範圍。例如:源10個文件 - > 10個結果文件,每個結果文件只有來自相應源文件的數據。

據我所知星火可以通過面具打開多個文件:

var source = sc.textFile("/source/data*.gz"); 

但在這種情況下,我不能識別文件的哪一行。如果我得到的源文件的列表,並嘗試過程以下情形:

JavaSparkContext sc = new JavaSparkContext(...); 
List<String> files = new ArrayList() //list of source files full name's 
for(String f : files) 
{ 
    JavaRDD<String> data = sc.textFile(f); 
    //process this file with Spark 
    outRdd.coalesce(1, true).saveAsTextFile(f + "_out"); 
} 

但在這種情況下,我會處理在連續模式下的所有文件。

我的問題是下一個:我怎麼可以在並行模式下處理很多文件?例如:一個文件 - 一個執行者?

非常感謝您的幫助!

回答

0

下面是步驟

  1. 使用sparkcontext.wholeTextFiles(「/路徑/到/文件夾/含有/所有/文件」)
  2. 上述返回一個RDD其中鍵是文件的路徑和值是文件的內容
  3. rdd.map(拉姆達X:X [1]) - 這給你只文件內容的RDD
  4. rdd.map(拉姆達X:customeFunctionToProcessFileContent(X))
  5. 由於地圖功能並行工作,你做的操作會更快,而且沒有順序 - 只要你的任務不依賴於彼此,這是並行的主要標準。

上面的工作與默認分區雖然。所以你可能得不到輸入文件計數等於輸出文件數(因爲輸出是分區數)。

根據您的數據,您可以根據計數或任何其他唯一值重新分區RDD,因此最終輸出文件計數等於輸入計數。這種方法將只有並​​行性,但不會達到最佳分區數目所達到的性能。

+0

嗨Ramzy,感謝您的回答,但我有另一個查詢。方法'sparkcontext.wholeTextFiles(「/ path/to/folder/contained/all/files」)'打開並讀取內存中的文件。據我所知,大多數源文件將有大約1-3百萬行,但是多個文件的大小可能高達2-3 GB。這將工作沒有任何內存錯誤? – Yustas

+0

當您使用sc.textFile或sc.wholeTextFiles時,計算尚未開始。只有當您執行處理開始的任何操作時,纔會基於數據集被劃分的默認分區。您可以通過您的RDD.partitions.length獲取分區數量並根據需要進行自定義,並且還可以獲取yourRDD.count()以獲取實際的RDD大小。 – Ramzy

+0

@Ramzy,wholeTextFiles將創建具有文件整個上下文的路徑和值的鍵的RDD。如果某些文件是2-3GB,則會出現明顯的問題(取決於執行程序的內存,但在任何情況下,1個分區的GB都太多) –

0

您可以打開常規java固定大小的線程池(比如說10個線程),並從Callable/Runnable提交sparkAssertFile。 這將提交10個並行作業,並且如果您的火花簇中有足夠的資源 - 它們將並行執行。 類似如下

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.Executor; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 

import com.google.common.collect.Lists; 

public class Test { 

    public static void main(String[] argv) { 
     final JavaSparkContext sc = new JavaSparkContext(...); 
     List<String> files = new ArrayList<>(); //list of source files full name's 
     ExecutorService pool = Executors.newFixedThreadPool(10); 
     List<Future<?>> futures = new ArrayList<>(); 
     for(final String f : files) 
     { 
      Future<?> fut = pool.submit(new Runnable() { 

       @Override 
       public void run() { 
        JavaRDD<String> data = sc.textFile(f); 
        //process this file with Spark 
        outRdd.coalesce(1, true).saveAsTextFile(f + "_out"); 

       } 
      }); 
      futures.add(fut); 

     } 
     //waiting for all tasks 
     for (Future<?> fut : futures) { 
      fut.get(); 
     } 
    } 
} 
+0

謝謝,我認爲這是有道理的。我會嘗試這種方法。 – Yustas

+0

我可以知道如何爲線程定義任務,以及如何收集和呈現線程。採用這種方法,10的並行性會實現嗎?應用程序Mapreduce和spark應用於並行處理。請重溫基礎知識可能會發現它們是否符合要求 – Ramzy

+0

@Yustas,我已經添加了一些代碼,將您的任務包裝在Runnable –