2014-10-28 22 views
0

我想將一些Pig變量存儲到Hadoop SequenceFile中,以便運行外部MapReduce作業。無法使用Pig編寫SequenceFile

假設我的數據有(chararray,INT)模式:

(hello,1) 
(test,2) 
(example,3) 

我寫這個存儲功能:

import java.io.IOException; 
import java.util.logging.Level; 
import java.util.logging.Logger; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 

import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.OutputFormat; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.pig.StoreFunc; 
import org.apache.pig.data.Tuple; 


public class StoreTest extends StoreFunc { 

    private String storeLocation; 
    private RecordWriter writer; 
    private Job job; 

    public StoreTest(){ 

    } 

    @Override 
    public OutputFormat getOutputFormat() throws IOException { 
     //return new TextOutputFormat(); 
     return new SequenceFileOutputFormat(); 
    } 

    @Override 
    public void setStoreLocation(String location, Job job) throws IOException { 
     this.storeLocation = location; 
     this.job = job; 
     System.out.println("Load location is " + storeLocation); 
     FileOutputFormat.setOutputPath(job, new Path(location));   
     System.out.println("Out path " + FileOutputFormat.getOutputPath(job)); 
    } 

    @Override 
    public void prepareToWrite(RecordWriter writer) throws IOException { 
     this.writer = writer; 
    } 

    @Override 
    public void putNext(Tuple tuple) throws IOException { 
     try { 
      Text k = new Text(((String)tuple.get(0))); 
      IntWritable v = new IntWritable((Integer)tuple.get(1)); 
      writer.write(k, v); 
     } catch (InterruptedException ex) { 
      Logger.getLogger(StoreTest.class.getName()).log(Level.SEVERE, null, ex); 
     } 

    } 
} 

這豬代碼:

register MyUDFs.jar; 
x = load '/user/pinoli/input' as (a:chararray,b:int); 
store x into '/user/pinoli/output/' using StoreTest(); 

然而,存儲失敗,我得到這個錯誤:

ERROR org.apache.pig.tools.pigstats.PigStats - ERROR 0: java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable 

有沒有什麼辦法解決這個問題?

回答

1

問題是您沒有設置輸出鍵/值類。您可以在setStoreLocation()方法做到這一點:

@Override 
public void setStoreLocation(String location, Job job) throws IOException { 
    this.storeLocation = location; 
    this.job = job; 
    this.job.setOutputKeyClass(Text.class); // !!! 
    this.job.setOutputValueClass(IntWritable.class); // !!! 
    ... 

} 

我猜你想要使用不同的鍵/值類型的貯藏器。在這種情況下,你可以將它們的類型傳遞給構造函數。 E.g:

private Class<? extends WritableComparable> keyClass; 
private Class<? extends Writable> valueClass; 
... 

public StoreTest() {...} 

@SuppressWarnings({ "unchecked", "rawtypes" }) 
    public StoreTest(String keyClass, String valueClass) { 
     try { 
      this.keyClass = (Class<? extends WritableComparable>) Class.forName(keyClass); 
      this.valueClass = (Class<? extends Writable>) Class.forName(valueClass); 
     } 
     catch (Exception e) { 
      throw new RuntimeException("Invalid key/value type", e); 
     } 
    } 

... 

@Override 
    public void setStoreLocation(String location, Job job) throws IOException { 
     this.storeLocation = location; 
     this.job = job; 
     this.job.setOutputKeyClass(keyClass); 
     this.job.setOutputValueClass(valueClass); 
     ... 
    } 

然後,設置在豬腳本的正確類型:

register MyUDFs.jar; 
DEFINE myStorer StoreTest('org.apache.hadoop.io.Text', 'org.apache.hadoop.io.IntWritable'); 
x = load '/user/pinoli/input' as (a:chararray,b:int); 
store x into '/user/pinoli/output/' using myStorer();