2012-04-30 32 views
2

我想使用Hadoop 0.20.0/0.20.2的CombineFileInputFormat,這樣它就可以爲每個記錄處理1個文件,而且不會影響數據 - 局部性(通常需要處理)。如何在Hadoop中使用CombineFileInputFormat?

它在湯姆懷特的Hadoop權威指南中提到,但他沒有展示如何去做。相反,他轉向序列文件。

我對記錄閱讀器中處理變量的含義非常困惑。任何代碼示例都會有很大的幫助。

預先感謝..

+0

你能詳細說明你的意思嗎?每個記錄有一個文件? –

回答

1

檢查用於組合文件的輸入格式的下面輸入格式。

import java.io.IOException; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; 


/** 
* CustomInputformat which implements the createRecordReader of abstract class CombineFileInputFormat 
*/ 

public class MyCombineFileInputFormat extends CombineFileInputFormat { 

    public static class MyRecordReader extends RecordReader<LongWritable,Text>{ 
     private LineRecordReader delegate=null; 
     private int idx; 

     public MyRecordReader(CombineFileSplit split,TaskAttemptContext taskcontext ,Integer idx) throws IOException { 
      this.idx=idx; 
      delegate = new LineRecordReader(); 
     } 

     @Override 
     public void close() throws IOException { 
      delegate.close(); 
     } 

     @Override 
     public float getProgress() { 
      try { 
       return delegate.getProgress(); 
      } 
      catch(Exception e) { 
       return 0; 
      } 
     } 

     @Override 
     public void initialize(InputSplit split, TaskAttemptContext taskcontext) throws IOException { 
      CombineFileSplit csplit=(CombineFileSplit)split; 
      FileSplit fileSplit = new FileSplit(csplit.getPath(idx), csplit.getOffset(idx), csplit.getLength(idx), csplit.getLocations()); 
      delegate.initialize(fileSplit, taskcontext); 
     } 

     @Override 
     public LongWritable getCurrentKey() throws IOException, 
       InterruptedException { 
      return delegate.getCurrentKey(); 
     } 


     @Override 
     public Text getCurrentValue() throws IOException, InterruptedException { 
      return delegate.getCurrentValue(); 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      return delegate.nextKeyValue(); 
     } 

    } 

    @SuppressWarnings("unchecked") 
    @Override 
    public RecordReader createRecordReader(InputSplit split,TaskAttemptContext taskcontext) throws IOException { 
     return new CombineFileRecordReader((CombineFileSplit) split, taskcontext, MyRecordReader.class); 
    } 
} 
0

以下是從所謂的「新API」中使用CombineFileInputFormat的最簡單方法。假設您的實際輸入格式爲MyFormat,並將其與的myKey的按鍵和myvalue的的價值工程(可能是SequenceFileInputFormat< MyKey, MyValue >一些子類,例如)。

public class CombinedMyFormat extends CombineFileInputFormat< MyKey, MyValue > { 
    // exists merely to fix the key/value types and 
    // inject the delegate format to the superclass 
    // if MyFormat does not use state, consider a constant instead 
    private static class CombineMyKeyMyValueReaderWrapper 
    extends CombineFileRecordReaderWrapper< MyKey, MyValue > { 
     protected CombineMyKeyMyValueReaderWrapper(
      CombineFileSplit split, TaskAttemptContext ctx, Integer idx 
     ) throws IOException, InterruptedException { 
      super(new MyFormat(), split, ctx, idx); 
     } 
    } 

    @Override 
    public RecordReader< MyKey, MyValue > createRecordReader(
     InputSplit split, TaskAttemptContext ctx 
    ) throws IOException { 
     return new CombineFileRecordReader< MyKey, MyValue >(
      (CombineFileSplit)split, ctx, CombineMyKeyMyValueReaderWrapper.class 
     ); 
    } 
} 

在你的工作的驅動程序,您現在應該能夠只是CombinedMyFormat跌幅爲MyFormat。您還應該設置一個max split size property以防止Hadoop將整個輸入合併爲一個分割。