2015-11-16 65 views
1

我需要根據UDF的評估來填充字段。輸入到UDF將是輸入中的一些其他字段以及csv表。目前,我採取的方法是加載CSV文件,將其歸爲ALL,然後作爲一個包傳遞給UDF以及其他所需的參數。然而,對於170k記錄的源數據以及大約150k的csv記錄,花費很長時間才能完成該過程(大約3小時)。在UDF中加載文件

我確定必須有更好的方法來處理這個問題,因此需要你的輸入。

source_alias = LOAD 'src.csv' USING 
          PigStorage(',') AS (f1:chararray,f2:chararray,f3:chararray); 

csv_alias = LOAD 'csv_file.csv' USING 
          PigStorage(',') AS (c1:chararray,c2:chararray,c3:chararray); 

grpd_csv_alias = GROUP csv_alias ALL; 

final_alias = FOREACH source_alias GENERATE f1 AS f1, 
myUDF(grpd_csv_alias, f2) AS derived_f2; 

這是我的UDF的高層次。

public class myUDF extends EvalFunc<String> { 

    public String exec(Tuple input) throws IOException { 

     String f2Response = "N"; 
     DataBag csvAliasBag = (DataBag)input.get(0); 
     String f2 = (String) input.get(1); 

     try { 

       Iterator<Tuple> bagIterator = csvAliasBag.iterator(); 


       while (bagIterator.hasNext()) { 
        Tuple localTuple = (Tuple)bagIterator.next(); 
        String col1 = ((String)localTuple.get(1)).trim().toLowerCase(); 
        String col2 = ((String)localTuple.get(2)).trim().toLowerCase(); 
        String col3 = ((String)localTuple.get(3)).trim().toLowerCase(); 
        String col4 = ((String)localTuple.get(4)).trim().toLowerCase(); 

        <Custom logic to populate f2Response based on the value in f2 and as well as col1, col2, col3 and col4> 

       } 
      } 

      return f2Response; 
     } 
     catch(Exception e){ 
      throw new IOException("Caught exception processing input row ", e); 
     } 


    } 

} 

我相信過程花費的時間太長,因爲建築和傳球csv_alias到UDF源文件中的每一行。

有沒有更好的方法來處理這個問題?

謝謝

回答

1

對於小文件,您可以將它們放在分佈式緩存上。這將文件作爲本地文件複製到每個任務節點,然後自己加載它。這是Pig docs UDF section的一個例子。但是,我不建議每次都解析文件。將結果存儲在類變量中,並檢查它是否已被初始化。如果csv在本地文件系統上,請使用getShipFiles。如果您使用的csv在HDFS上,則使用getCachedFiles方法。請注意,對於HDFS,文件路徑後跟一個#和一些文本。在#的左邊是HDFS路徑,右邊是你希望它被複制到本地文件系統時調用的名稱。

public class Udfcachetest extends EvalFunc<String> { 

public String exec(Tuple input) throws IOException { 
    String concatResult = ""; 
    FileReader fr = new FileReader("./smallfile1"); 
    BufferedReader d = new BufferedReader(fr); 
    concatResult +=d.readLine(); 
    fr = new FileReader("./smallfile2"); 
    d = new BufferedReader(fr); 
    concatResult +=d.readLine(); 
    return concatResult; 
} 

public List<String> getCacheFiles() { 
    List<String> list = new ArrayList<String>(1); 
    list.add("/user/pig/tests/data/small#smallfile1"); // This is hdfs file 
    return list; 
} 

public List<String> getShipFiles() { 
    List<String> list = new ArrayList<String>(1); 
    list.add("/home/hadoop/pig/smallfile2"); // This local file 
    return list; 
} 
} 
+0

謝謝@TheCowGoesMoo我以後用類似的方法實現了。這絕對有助於處理時間。 – greenhorntechie