2013-03-04 47 views
1

映射器沒有設法從多個目錄中讀取文件。誰能幫忙? 我需要讀取每個映射器中的一個文件。我添加了多個輸入路徑並實現了自定義WholeFileInputFormat,WholeFileRecordReader。在地圖方法中,我不需要輸入鍵。我確保每個地圖都可以讀取整個文件。Hadoop:映射器沒有從多個輸入路徑讀取文件

命令行:hadoop jar AutoProduce.jar Autoproduce/input_a/input_b/output 我指定了兩個輸入路徑---- 1.input_a; 2.input_b;

Run方法片段:

Job job = new Job(getConf()); 
job.setInputFormatClass(WholeFileInputFormat.class); 
FileInputFormat.setInputPaths(job, new Path(args[0]), new Path(args[1])); 
FileOutputFormat.setOutputPath(job, new Path(args[2])); 

地圖方法片段:

public void map(NullWritable key, BytesWritable value, Context context){ 
    FileSplit fileSplit = (FileSplit) context.getInputSplit(); 
    System.out.println("Directory :" + fileSplit.getPath().toString()); 
    ...... 
} 

定製WholeFileInputFormat:

class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { 
    @Override 
    protected boolean isSplitable(JobContext context, Path file) { 
     return false; 
    } 

    @Override 
    public RecordReader<NullWritable, BytesWritable> createRecordReader(
     InputSplit split, TaskAttemptContext context) throws IOException, 
     InterruptedException { 

     WholeFileRecordReader reader = new WholeFileRecordReader(); 
     reader.initialize(split, context); 
     return reader; 
    } 
} 

定製WholeFileRecordReader:

class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> { 
    private FileSplit fileSplit; 
    private Configuration conf; 
    private BytesWritable value = new BytesWritable(); 
    private boolean processed = false; 

    @Override 
    public void initialize(InputSplit split, TaskAttemptContext context) 
    throws IOException, InterruptedException { 
     this.fileSplit = (FileSplit) split; 
     this.conf = context.getConfiguration(); 
    } 

    @Override 
    public boolean nextKeyValue() throws IOException, InterruptedException { 
     if (!processed) { 

      byte[] contents = new byte[(int) fileSplit.getLength()]; 
      Path file = fileSplit.getPath(); 
      FileSystem fs = file.getFileSystem(conf); 
      FSDataInputStream in = null; 
      try { 
       in = fs.open(file); 
       IOUtils.readFully(in, contents, 0, contents.length); 
       value.set(contents, 0, contents.length); 
      } finally { 
       IOUtils.closeStream(in); 
      } 
      processed = true; 
      return true; 
     } 
     return false; 
    } 
    @Override 
    public NullWritable getCurrentKey() throws IOException,InterruptedException { 
     return NullWritable.get(); 
    } 

    @Override 
    public BytesWritable getCurrentValue() throws IOException,InterruptedException { 
     return value; 
    } 

    @Override 
    public float getProgress() throws IOException { 
     return processed ? 1.0f : 0.0f; 
    } 

    @Override 
    public void close() throws IOException { 
     // do nothing 
    } 
} 

問題:

設置兩個輸入路徑之後,所有地圖的任務只能從一個目錄中讀取文件..提前

感謝。

回答

1

您必須在驅動程序中使用MultipleInputs而不是FileInputFormat。所以,你的代碼應該爲:

MultipleInputs.addInputPath(job, new Path(args[0]), <Input_Format_Class_1>); 
MultipleInputs.addInputPath(job, new Path(args[1]), <Input_Format_Class_2>); 
. 
. 
. 
MultipleInputs.addInputPath(job, new Path(args[N-1]), <Input_Format_Class_N>); 

所以,如果你想使用WholeFileInputFormat爲第二個輸入通道的第一輸入路徑和TextInputFormat,你將不得不使用它的方式如下:

MultipleInputs.addInputPath(job, new Path(args[0]), WholeFileInputFormat.class); 
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class); 

希望這適合你!

+1

我不知道應該還是有InputFormatClass集,因爲我們有InputFormatClass旁邊的單個文件。 – 2013-07-30 09:19:45

相關問題