我需要根據UDF的評估來填充字段。輸入到UDF將是輸入中的一些其他字段以及csv表。目前,我採取的方法是加載CSV文件,將其歸爲ALL,然後作爲一個包傳遞給UDF以及其他所需的參數。然而,對於170k記錄的源數據以及大約150k的csv記錄,花費很長時間才能完成該過程(大約3小時)。在UDF中加載文件
我確定必須有更好的方法來處理這個問題,因此需要你的輸入。
source_alias = LOAD 'src.csv' USING
PigStorage(',') AS (f1:chararray,f2:chararray,f3:chararray);
csv_alias = LOAD 'csv_file.csv' USING
PigStorage(',') AS (c1:chararray,c2:chararray,c3:chararray);
grpd_csv_alias = GROUP csv_alias ALL;
final_alias = FOREACH source_alias GENERATE f1 AS f1,
myUDF(grpd_csv_alias, f2) AS derived_f2;
這是我的UDF的高層次。
public class myUDF extends EvalFunc<String> {
public String exec(Tuple input) throws IOException {
String f2Response = "N";
DataBag csvAliasBag = (DataBag)input.get(0);
String f2 = (String) input.get(1);
try {
Iterator<Tuple> bagIterator = csvAliasBag.iterator();
while (bagIterator.hasNext()) {
Tuple localTuple = (Tuple)bagIterator.next();
String col1 = ((String)localTuple.get(1)).trim().toLowerCase();
String col2 = ((String)localTuple.get(2)).trim().toLowerCase();
String col3 = ((String)localTuple.get(3)).trim().toLowerCase();
String col4 = ((String)localTuple.get(4)).trim().toLowerCase();
<Custom logic to populate f2Response based on the value in f2 and as well as col1, col2, col3 and col4>
}
}
return f2Response;
}
catch(Exception e){
throw new IOException("Caught exception processing input row ", e);
}
}
}
我相信過程花費的時間太長,因爲建築和傳球csv_alias到UDF源文件中的每一行。
有沒有更好的方法來處理這個問題?
謝謝
謝謝@TheCowGoesMoo我以後用類似的方法實現了。這絕對有助於處理時間。 – greenhorntechie