2012-08-11 110 views
0

後提領實際上涉及到的問題How can I add row numbers for rows in PIG or HIVE?錯誤豬UDF

作者Srini提供的3回答工作正常,但我有麻煩了UDF之後訪問數據。

作者Srini提供的UDF是繼

import java.io.IOException; 
import java.util.Iterator; 
import org.apache.pig.EvalFunc; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.data.BagFactory; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.apache.pig.impl.logicalLayer.schema.Schema; 
import org.apache.pig.data.DataType; 

public class RowCounter extends EvalFunc<DataBag> { 
TupleFactory mTupleFactory = TupleFactory.getInstance(); 
BagFactory mBagFactory = BagFactory.getInstance(); 
public DataBag exec(Tuple input) throws IOException { 
    try { 
     DataBag output = mBagFactory.newDefaultBag(); 
     DataBag bg = (DataBag)input.get(0); 
     Iterator it = bg.iterator(); 
     Integer count = new Integer(1); 
     while(it.hasNext()) 
      { Tuple t = (Tuple)it.next(); 
       t.append(count); 
       output.add(t); 
       count = count + 1; 
      } 

     return output; 
    } catch (ExecException ee) { 
     // error handling goes here 
     throw ee; 
    } 
} 
public Schema outputSchema(Schema input) { 
    try{ 
     Schema bagSchema = new Schema(); 
     bagSchema.add(new Schema.FieldSchema("RowCounter", DataType.BAG)); 

     return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), 
               bagSchema, DataType.BAG)); 
    }catch (Exception e){ 
     return null; 
    } 
    } 
} 

我寫了一個簡單的測試腳本豬如下

A = load 'input.txt' using PigStorage(' ') as (name:chararray, age:int); 
/* 
--A: {name: chararray,age: int} 
(amy,56) 
(bob,1) 
(bob,9) 
(amy,34) 
(bob,20) 
(amy,78) 
*/ 
B = group A by name; 
C = foreach B { 
    orderedGroup = order A by age; 
    generate myudfs.RowCounter(orderedGroup) as t; 
} 
/* 
--C: {t: {(RowCounter: {})}} 
({(amy,34,1),(amy,56,2),(amy,78,3)}) 
({(bob,1,1),(bob,9,2),(bob,20,3)}) 
*/ 
D = foreach C generate FLATTEN(t); 
/* 
D: {t::RowCounter: {}} 
(amy,34,1) 
(amy,56,2) 
(amy,78,3) 
(bob,1,1) 
(bob,9,2) 
(bob,20,3) 
*/ 

的問題是如何在以後的操作中使用d。我試過多種方法,但總是有以下錯誤

ava.lang.ClassCastException: java.lang.String cannot be cast to org.apache.pig.data.DataBag 
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:575) 
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:248) 
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:316) 
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:332) 
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:284) 
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:459) 
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:427) 
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:407) 
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:261) 
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) 
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572) 
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:256) 

我的猜測是,因爲我們沒有對包內的元組的模式。如果這是原因,我應該如何修改udf?

+0

好的,我找到了解決方案 – user1591487 2012-08-14 01:18:59

回答

0

確定,我通過添加outputSchema如下

public Schema outputSchema(Schema input) { 
    try{ 
     Schema.FieldSchema counter = new Schema.FieldSchema("counter", DataType.INTEGER); 
     Schema tupleSchema = new Schema(input.getField(0).schema.getField(0).schema.getFields()); 
     tupleSchema.add(counter); 

     Schema.FieldSchema tupleFs; 
     tupleFs = new Schema.FieldSchema("with_counter", tupleSchema, DataType.TUPLE); 

     Schema bagSchema = new Schema(tupleFs); 
     return new Schema(new Schema.FieldSchema("row_counter", 
               bagSchema, DataType.BAG)); 
    }catch (Exception e){ 
     return null; 
    } 
    } 
} 

由於找到了解決辦法。