2013-02-06 74 views
0

我正在編寫自己的Pig Store類,我不想將其存儲在文件中,我打算將它發送給某些第三方數據存儲(缺少API調用)。未設置Hadoop Pig輸出目錄

注意:我在Cloudera的VirtualBox映像上運行它。

我寫我的java類(見下表),並創建mystore.jar我使用這下面id.pig腳本:

store B INTO 'mylocation' USING MyStore('mynewlocation') 

運行此腳本以豬,同時,我看到下面的錯誤: 錯誤6000: 輸出位置驗證失敗:'file://home/cloudera/test/id.out更多信息如下: 未設置輸出目錄。

or.apache.pig.impl.plan.VisitorException: ERROR 6000: 
at or.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileValidator.visit(InputOutputFileValidator.java:95) 

請幫忙!

-------------------- MyStore.java ----------------------

public class MyStore extends StoreFunc { 
    protected RecordWriter writer = null; 
    private String location = null; 


    public MyStore() { 
     location= null; 
    } 

    public MyStore (String location) { 
     this.location= location; 
    } 

    @Override 
    public OutputFormat getOutputFormat() throws IOException { 
     return new MyStoreOutputFormat(location); 
    } 

    @Override 
    public void prepareToWrite(RecordWriter writer) throws IOException { 
     this.writer = writer; 
    } 

    @Override 
    public void putNext(Tuple tuple) throws IOException { 
     //write tuple to location 

     try { 
      writer.write(null, tuple.toString()); 
     } catch (InterruptedException e) {   
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void setStoreLocation(String location, Job job) throws IOException { 
     if(location!= null) 
      this.location= location; 
    } 

} 

-------------------- MyStoreOutputFormat.java --------------------- -

import java.io.DataOutputStream; 
import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.pig.data.Tuple; 

public class MyStoreOutputFormat extends 
     TextOutputFormat<WritableComparable, Tuple> { 
    private String location = null; 

    public MyStoreOutputFormat(String location) { 

     this.location = location; 
    } 

    @Override 
    public RecordWriter<WritableComparable, Tuple> getRecordWriter(
      TaskAttemptContext job) throws IOException, InterruptedException { 

     Configuration conf = job.getConfiguration(); 

     String extension = location; 
     Path file = getDefaultWorkFile(job, extension);  
     FileSystem fs = file.getFileSystem(conf); 

     FSDataOutputStream fileOut = fs.create(file, false); 

     return new MyStoreRecordWriter(fileOut); 
    } 

    protected static class MyStoreRecordWriter extends 
      RecordWriter<WritableComparable, Tuple> { 

     DataOutputStream out = null; 

     public MyStoreRecordWriter(DataOutputStream out) { 
      this.out = out; 
     } 

     @Override 
     public void close(TaskAttemptContext taskContext) throws IOException, 
       InterruptedException { 
      // close the location 
     } 

     @Override 
     public void write(WritableComparable key, Tuple value) 
       throws IOException, InterruptedException { 

      // write the data to location 
      if (out != null) { 
       out.writeChars(value.toString()); // will be calling API later. let me first dump to the location! 
      } 
     } 

    } 
} 

我錯過了什麼嗎?

+0

請幫忙。我迫切需要它。謝謝! –

回答

1

首先,我認爲你應該使用作業配置存儲位置值,而不是變量實例

計劃作業時你的任務在setStoreLocation方法的局部變量「位置」被調用,但getOutputFormat調用可能不會在執行階段執行,屆時位置變量可能不再設置(您的類的新實例可能已創建)。

如果你看看來源爲PigStorage.setStoreLocation,你應該注意到他們店的位置在任務配置(2號線):

@Override 
public void setStoreLocation(String location, Job job) throws IOException { 
    job.getConfiguration().set("mapred.textoutputformat.separator", ""); 
    FileOutputFormat.setOutputPath(job, new Path(location)); 

    if("true".equals(job.getConfiguration().get("output.compression.enabled"))) { 
     FileOutputFormat.setCompressOutput(job, true); 
     String codec = job.getConfiguration().get("output.compression.codec"); 
     try { 
      FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(codec)); 
     } catch (ClassNotFoundException e) { 
      throw new RuntimeException("Class not found: " + codec); 
     } 
    } else { 
     // This makes it so that storing to a directory ending with ".gz" or ".bz2" works. 
     setCompression(new Path(location), job); 
    } 
} 

所以,我認爲你應該存儲在工作變量的位置:

@Override 
public void setStoreLocation(String location, Job job) throws IOException { 
    if(location!= null) 
     job.getConfiguration().set("mylocation", location); 
} 

其中自定義輸出格式就可以在createRecordReader方法提取:

@Override 
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
     TaskAttemptContext job) throws IOException, InterruptedException { 

    Configuration conf = job.getConfiguration(); 

    String extension = conf.get("mylocation"); 
    Path file = getDefaultWorkFile(job, extension);  
    FileSystem fs = file.getFileSystem(conf); 

    FSDataOutputStream fileOut = fs.create(file, false); 

    return new MyStoreRecordWriter(fileOut); 
} 

最後(也可能是您看到的錯誤的實際原因),輸出格式擴展了TextOutputFormat,並且在記錄編寫器中使用getDefaultWorkFile方法 - 此方法需要知道將文件輸出到HDFS的位置,並且您的setStoreLocation方法中沒有調用FileOutputFormat.setOutputPath(job, new Path(location));(請參閱我之前粘貼的PigStorage.setStoreLocation方法)。所以錯誤是因爲它不知道在哪裏創建默認工作文件。

+0

謝謝克里斯。我錯過了「FileOutputFormat.setOutputPath(job,new Path(location));」呼叫。根據你的輸入改變我的代碼。 –