2012-05-30 52 views
4

我嘗試運行一個正在調用用java.I寫成的用戶定義函數的豬腳本,試圖用一個264Bytes的非常小的文件來測試這個腳本。我最終得到java堆空間錯誤,並且作業失敗。我已經嘗試使用-Xms1024M選項運行作業,它運行較小的文件,但失敗時顯示較大的文件。 即使那麼我的集羣足夠強大,不會絆倒這樣的小文件,我想知道如何解決這個內存泄漏問題。 有人可以請幫助,Pig JVM Java堆空間錯誤

import java.util.HashMap; 
import java.lang.annotation.Annotation; 
import java.lang.reflect.Array; 
import java.lang.reflect.Method; 
import java.io.IOException; 
import java.util.Iterator; 
import java.util.List; 
import java.util.ArrayList; 
import java.util.Map; 
import java.util.Set; 
import java.text.*; 

import org.apache.pig.EvalFunc; 
import org.apache.pig.data.*; 


import com.tictactec.ta.lib.CoreAnnotated; 
import com.tictactec.ta.lib.MAType; 
import com.tictactec.ta.lib.MInteger; 
import com.tictactec.ta.lib.RetCode; 
import com.tictactec.ta.lib.meta.annotation.InputParameterInfo; 
import com.tictactec.ta.lib.meta.annotation.InputParameterType; 
import com.tictactec.ta.lib.meta.annotation.OptInputParameterInfo; 
import com.tictactec.ta.lib.meta.annotation.OptInputParameterType; 
import com.tictactec.ta.lib.meta.annotation.OutputParameterInfo; 
import com.tictactec.ta.lib.meta.annotation.OutputParameterType; 

public class taLib extends EvalFunc<DataBag> 
{ 

    private static final int MIN_ARGS = 3; 

    public static CoreAnnotated core = new CoreAnnotated(); 
    private static Method func_ref = null; 

    public DecimalFormat df = new DecimalFormat("#.###"); 


    public DataBag exec(Tuple args) throws IOException 
    { 

     DataBag input=null; 
     MInteger outStart = new MInteger(); 
     MInteger outLen = new MInteger(); 
     Map<String,Object>outputParams=new HashMap<String, Object>(); 
     String func_name; 
     List<Integer> ip_colmns= new ArrayList<Integer>(); 
     List<double[]>ip_list=new ArrayList<double[]>(); 
     List<String>opt_type=new ArrayList<String>(); 
     List<Object>opt_params=new ArrayList<Object>(); 
     ////// 

     long m1=Runtime.getRuntime().freeMemory(); 
     System.out.println(m1); 
     long m2=Runtime.getRuntime().totalMemory(); 
     System.out.println(m2); 
     ////// 
     int ip_noofparams=0; 
     int op_noofparams=0; 
     int opt_noofparams=0; 

     if (args == null || args.size() < MIN_ARGS) 
      throw new IllegalArgumentException("talib: must have at least " + 
MIN_ARGS + " args"); 

     if(args.get(0) instanceof DataBag) 
     {input = (DataBag)args.get(0);} 
     else{throw new IllegalArgumentException("Only a valid bag name can be 
passed");} 

     // get no of fields in bag 
     Tuple t0=input.iterator().next(); 
     int fields_in_bag=t0.getAll().size(); 

     if(args.get(1) instanceof String) 
     {func_name = (String)args.get(1);} 
     else{throw new IllegalArgumentException("Only valid function name can be 
passed at arg 1");} 
     func_ref=methodChk(func_name); 

     if (func_ref == null) { 
      throw new IllegalArgumentException("talib: function " 
           + func_name + " was not found"); 
     } 

     for (Annotation[] annotations : func_ref.getParameterAnnotations()) 
      { 
       for (Annotation annotation : annotations) 
       { 
       if(annotation instanceof InputParameterInfo) 
       { 
       InputParameterInfo inputParameterInfo = 
(InputParameterInfo)annotation; 


if(inputParameterInfo.type().equals(InputParameterType.TA_Input_Price)) 
        { 

ip_noofparams=numberOfSetBits(inputParameterInfo.flags()); 
        } 
        else 
        { 
        ip_noofparams++; 
        } 
       } 
       if(annotation instanceof OptInputParameterInfo) 
       { 
        OptInputParameterInfo optinputParameterInfo= 
(OptInputParameterInfo)annotation; 
        opt_noofparams++; 
        if 
(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerRange)) 
        { 
         opt_type.add("Integer"); 
        } 
        else 
if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_RealRange)) 
        { 
         opt_type.add("Double"); 
        } 
        else 
if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerList)) 
        { 
         opt_type.add("String"); 
        } 
        else{throw new IllegalArgumentException("whoopsie ...serious 
mess in opt_annotations");} 

       } 
       if (annotation instanceof OutputParameterInfo) 
       { 
          OutputParameterInfo outputParameterInfo = 
(OutputParameterInfo) annotation; 
          op_noofparams++; 
        if 
(outputParameterInfo.type().equals(OutputParameterType.TA_Output_Real)) 
        { 
           outputParams.put(outputParameterInfo.paramName(), new 
double[(int) input.size()]); 
        } 
        else if  
(outputParameterInfo.type().equals(OutputParameterType.TA_Output_Integer)) 
        { 
           outputParams.put(outputParameterInfo.paramName(), new 
    int[(int)input.size()]); 
          } 
       } 
       } 
      } 

     int total_params =ip_noofparams+opt_noofparams; 
     if((args.size()-2)!=total_params){throw new IllegalArgumentException("Wrong 
no of argumets passed to UDF");} 


     // get the ip colmns no's 
     for(int i=2;i<(2+ip_noofparams);i++) 
     { 
      if(args.get(i) instanceof Integer) 
      { 
       if((Integer)args.get(i)>=0 && (Integer)args.get(i)<fields_in_bag) 
      { 
      ip_colmns.add((Integer) args.get(i)); 
      } 
      else{throw new IllegalArgumentException("The input colmn specified 
is invalid..please enter a valid colmn no:0-"+(fields_in_bag-1));}  
      } 
       else{throw new IllegalArgumentException("Wrong arguments entered: 
Only"+ip_noofparams+"field no's of type(integer) allowed for fn"+func_name); } 

     } 

     // create a list of ip arrays 
     for(int i=0;i<ip_colmns.size();i++) 
     { 
     ip_list.add((double[]) Array.newInstance(double.class, (int)input.size())); 
     } 
     int z=0; 
     int x=0; 
     // fill up the arrays 
     for(Tuple t1: input) 
     {  

      Iterator<double[]> itr=ip_list.iterator(); 
      z=0; 
      while(itr.hasNext()) 
      { 

      if((Double)t1.get(ip_colmns.get(z)) instanceof Double) 
      { 
       ((double[])itr.next())[x]=(Double) t1.get(ip_colmns.get(z++)); 
      } 
      else{throw new IllegalArgumentException("Illegal argument while 
filling up array...only double typr allowed");} 
      } 
      x++; 
     } 

     //deal with opt params 
     int s=0; 
     for(int i=(2+ip_noofparams);i<(2+ip_noofparams+opt_noofparams);i++) 
     { 



if(opt_type.get(s).equalsIgnoreCase(args.get(i).getClass().getSimpleName().toString())) 
      {  
       if(opt_type.get(s).equalsIgnoreCase("String")) 
       { 
        String m=args.get(i).toString().toLowerCase(); 
        String ma=m.substring(0, 1).toUpperCase(); 
        String mac=m.substring(1); 
        String macd=ma+mac; 
        MAType type =MAType.valueOf(macd); 
        opt_params.add(type); 
        s++; 
       } 

       else{ 
        opt_params.add(args.get(i)); 
        s++; 
        } 

      } 
      else if(opt_type.get(s).equalsIgnoreCase("Double")) 
        { 


if(args.get(i).getClass().getSimpleName().toString().equalsIgnoreCase("Integer")) 
        { 
        opt_params.add((Double)((Integer)args.get(i)+0.0)); 
        s++; 
        } 
        else{throw new IllegalArgumentException("Opt arguments do 
not match for fn:"+func_name+", pls enter opt arguments in right order"); } 
        } 
      else{throw new IllegalArgumentException("Opt arguments do not match 
for fn:"+func_name+", pls enter opt arguments in right order");} 

     } 


     List<Object> ta_argl = new ArrayList<Object>(); 
     ta_argl.add(new Integer(0)); 
     ta_argl.add(new Integer((int)input.size() - 1)); 
     for(double[]in: ip_list) 
     { 
     ta_argl.add(in); 
     } 

     if(opt_noofparams!=0) 
     {ta_argl.addAll(opt_params);} 
     ta_argl.add(outStart); 
     ta_argl.add(outLen); 

     for (Map.Entry<String, Object> entry : outputParams.entrySet()) 
      { 
      ta_argl.add(entry.getValue()); 
      } 


      RetCode rc = RetCode.Success; 
     try { 
      rc = (RetCode)func_ref.invoke(core, ta_argl.toArray()); 
      } catch (Exception e) 
        { 
      assert false : "I died in ta-lib, but Java made me a zombie..."; 
      } 

     assert rc == RetCode.Success : "ret code from " + func_name; 



     if (outLen.value == 0) return null; 

     ////// 
     DataBag ret=null; 
     ret =outTA(input,outputParams,outStart); 
     outputParams.clear(); 
     ip_list.clear(); 
     opt_params.clear(); 
     opt_type.clear(); 
     ip_colmns.clear(); 
     Runtime.getRuntime().gc(); 
     return ret; 

    } 





    public DataBag outTA(DataBag bag,Map<String, Object> outputParams,MInteger outStart) 
    { 
     DataBag nbag=null; 
     TupleFactory mTupleFactory=TupleFactory.getInstance(); 
     BagFactory mBagFactory=BagFactory.getInstance(); 
     nbag=mBagFactory.newDefaultBag(); 
     Tuple tw=bag.iterator().next(); 
     int fieldsintup=tw.getAll().size(); 


     for(Tuple t0: bag) 
     { 
      Tuple t1=mTupleFactory.newTuple(); 

      for(int z=0;z<fieldsintup;z++) 
      { 
       try { 
        t1.append(t0.get(z)); 
       } catch (Exception e) { 
        // TODO Auto-generated catch block 
        System.out.println("Ouch"); 
       } 
      } 
      nbag.add(t1); 
     } 

     int i = 0; 
     int j=0;   
     for (Tuple t2: nbag) 
     { 
     if(i>=outStart.value) 
      { 
      for(Map.Entry<String,Object>entry: outputParams.entrySet()) 
      { 
      t2.append(entry.getKey().substring(3).toString()); 

      if(entry.getValue() instanceof double[]) 
       { 
       t2.append(new Double 
(df.format(((double[])entry.getValue())[j]))); 
       } 
      else if(entry.getValue() instanceof int[]) 
       { 
       t2.append(((int[])entry.getValue())[j]); 
       } 
      else{throw new 
IllegalArgumentException(entry.getValue().getClass()+"not supported");} 
      }  
      i++;j++; 
      } 
      else 
      {t2.append(0.0); 
      i++;  
       } 

     } 

     return nbag; 
    } 

    public Method methodChk(String fn) 
    { 
     String fn_name=fn; 
     Method tmp_fn=null; 
     for (Method meth: core.getClass().getDeclaredMethods()) 
    { 
     if (meth.getName().equalsIgnoreCase(fn_name)) 
     { 
     tmp_fn = meth; 
     break; 
     } 
    } 
     return tmp_fn; 
    } 


    public int numberOfSetBits(int i) { 
     i = i - ((i >> 1) & 0x55555555); 
     i = (i & 0x33333333) + ((i >> 2) & 0x33333333); 
     return ((i + (i >> 4) & 0xF0F0F0F) * 0x1010101) >> 24; 
    } 

} 
+0

你可以在哪裏發佈你的UDF的代碼?或至少描述它在做什麼? –

+0

UDF基本上是根據用戶傳遞給UDF的參數計算時間序列數據的函數(用戶指定的)。我不認爲我可以發佈代碼...它的大,但我可以張貼錯誤即時獲得 – user1426777

+0

嗯,我問,因爲如果你積累的價值成一套,並且這個集合增長很大,那麼你可以很容易地用完內存 - 也許你沒有清除電話之間的集合/集合,可能是你追加到緩衝區,沒有清除等等。沒有人可以用你提供的當前信息來幫助你 - 這就像我說「我的車壞了,請修復它,但我不能讓你看到或觸摸它「 –

回答

4

可能與BZIP編解碼器的一個問題 - API並指出,這是相當內存餓:

When you increased the memory with -Xms2048m did you set the options for the pig grunt shell, or for the map/reduce jobs?

set mapred.child.java.opts=-Xmx2048m 

你可以通過查看JobTracker進行檢查,找到失敗的作業,打開job.xml和loca te的值mapred.child.java.opts

+1

我設置了mapred作業的選項,所以我的命令看起來像 豬-Dpig.mapred.child.java.opts = -Xms1024M user1426777

+0

好吧,所以經過一些更多的混亂之後我可以肯定地說,它的bz2壓縮多數民衆贊成在導致堆空間問題...所以我轉移到gzip ..這似乎工作。 – user1426777

+0

非常感謝這個建議......但是現在每當我嘗試運行一個8 GB的大文件時,都會導致作業崩潰,並且出現這個錯誤 – user1426777