2012-04-04 63 views
2

我正在嘗試編寫一個Java UDF,它會使用一個Java UDF對一個包中的元組進行排名。 元組有一個值列,它是排名的標準和最初設置爲0的列列。 元組根據值列進行排序。 所有的元組放在一個袋子裏,這個袋子放在一個傳遞給UDF的新元組中。apache pig Java UDF - 改變屬性值似乎並不堅持

但是,UDF正在修改等級列 - 但是一旦方法退出,值又全部變爲0。我不確定如何獲取「Stick」值。

任何幫助將不勝感激。

這裏是我的java類

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.List; 
import org.apache.pig.FilterFunc; 
import org.apache.pig.EvalFunc; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.data.DataType; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.impl.logicalLayer.FrontendException; 
import java.util.Iterator; 
import org.apache.pig.PigWarning; 

/** 
* 
* @author Winter 
*/ 
public class Ranker extends EvalFunc<String>{ 
    @Override 
    public String exec(Tuple tuple) throws IOException { 
     if (tuple == null || tuple.size() == 0) { 
      return null; 
     } 


     List<Object> list = tuple.getAll(); 
     DataBag db = (DataBag) list.get(0); 
     Integer num = (Integer)list.get(1); 

     Iterator<Tuple>itr = db.iterator(); 
     boolean containsNonNull = false; 
     int i = 1; 
     double previous=0; 
     while (itr.hasNext()) { 

      Tuple t= itr.next(); 
      double d = (Double)t.get(num.intValue()); 
      int rankCol = t.size()-1; 
      Integer rankVal = (Integer)t.get(rankCol); 
      if(i == 0){  
       System.out.println("i==0"); 
       previous = d; 
       t.set(rankCol, i); 
      } else { 
       if(d == previous) 
        t.set(rankCol, i); 
       else{ 
        System.out.print("d!==previous|" + d + "|"+ previous+"|"+rankVal); 
        t.set(rankCol, ++i); 
        rankVal = (Integer)t.get(rankCol); 
        System.out.println("|now rank val" + rankVal); 
        previous = d; 
       } 
      } 
     } 


     return "Y"; 
    } 
} 

這裏是我如何打電話豬的一切 -

REGISTER /myJar.jar; 
A = LOAD '/Users/Winter/milk-tea-coffee.tsv' as (year:chararray, milk:double); 
B = foreach A generate year, milk, 0 as rank; 
C = order B by milk asc; 
D = group C by rank order C by milk; 
E = foreach D generate D.C.year,D.C.milk,D.C.rank, piglet3.evalFunctions.Ranker(D.C,1); 
dump E; 

我可以告訴它的UDF裏工作,因爲在UDF中打印報表 - d!==上一頁| 21.2 | 0.0 | 0 |現在排名val2 d!==上一頁| 21.6 | 21.2 | 0 |現在排名val3 d!==上一頁| 21.9 | 21.6 | 0 |現在排名val4 d !==上一頁| 22.0 | 21.9 | 0 |現在排名val5上一頁| 22.5 | 22.0 | 0 |現在排名val6 d!==上一頁| 22.9 | 22.5 | 0 |現在排名val7 d!==上一頁| 23.0 | 22.9 | 0 |現在排名val8 d !==以前| 23.4 | 23.0 | 0 |現在排名val9 d ==以前|!23.8 | 23.4 | 0 |現在排名val10 d ==以前|!23.9 | 23.8 | 0 |現在排名VAl11難

但是當我轉儲出E或D或C時,等級列只包含0。

回答

1

exec函數必須從UDF返回所需的輸出。您當前正在修改傳遞給exec函數的元組,然後返回字符串「Y」 - Pig的所有輸出爲UDF的輸出爲「Y」。在這種情況下,您應該返回Tuple而不是「Y」。

我認爲下面的代碼是接近你的意圖,但我不是你正在嘗試做的相當清楚:

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.List; 
import org.apache.pig.FilterFunc; 
import org.apache.pig.EvalFunc; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.data.DataType; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.impl.logicalLayer.FrontendException; 
import java.util.Iterator; 
import org.apache.pig.PigWarning; 

/** 
* 
* @author Winter 
*/ 
public class Ranker extends EvalFunc<Tuple>{ 
    @Override 
    public Tuple exec(Tuple tuple) throws IOException { 
     if (tuple == null || tuple.size() == 0) { 
      return null; 
     } 


     List<Object> list = tuple.getAll(); 
     DataBag db = (DataBag) list.get(0); 
     Integer num = (Integer)list.get(1); 

     Iterator<Tuple>itr = db.iterator(); 
     boolean containsNonNull = false; 
     int i = 1; 
     double previous=0; 
     while (itr.hasNext()) { 

      Tuple t= itr.next(); 
      double d = (Double)t.get(num.intValue()); 
      int rankCol = t.size()-1; 
      Integer rankVal = (Integer)t.get(rankCol); 
      if(i == 0){  
       System.out.println("i==0"); 
       previous = d; 
       t.set(rankCol, i); 
      } else { 
       if(d == previous) 
        t.set(rankCol, i); 
       else{ 
        System.out.print("d!==previous|" + d + "|"+ previous+"|"+rankVal); 
        t.set(rankCol, ++i); 
        rankVal = (Integer)t.get(rankCol); 
        System.out.println("|now rank val" + rankVal); 
        previous = d; 
       } 
      } 
     } 


     return tuple; 
    } 
} 
+0

我覺得我的問題的一部分是排名元組你必須將其與它上面的元組相比較,所以你必須在整個包上工作。 – Winter 2012-04-05 12:14:06

+0

例如,一個元組不知道它是否在第二位,除非它能看到它上面的第一位元組。這就是爲什麼我把所有元組放在一個包裏,然後把這個包放到一個新的元組中。但是,完成整個事情可能有一個完全更好的方法。 – Winter 2012-04-05 12:17:21

+0

你是對的!非常感謝! – Winter 2012-04-05 13:25:00