2013-01-11 38 views
5

有人可以指出我在哪裏可以找到CombineFileInputFormat的組件實現(組織使用Hadoop 0.20.205?這是使用EMR從非常小的日誌文件(文本行)創建大分割CombineFileInputFormat的實現Hadoop 0.20.205

令人驚訝的是,Hadoop沒有爲這個目的而專門設計的這個類的默認實現,並且它看起來像我並不是唯一一個被這個困惑的我需要編譯這個類並捆綁它一個用於hadoop流的jar,對Java有限的知識這是一些挑戰。

編輯: 我已經嘗試了使用neitrails的例子,子宮頸進口,但我得到一個編譯器錯誤的下一個方法。

+0

你跟雪人足跡例子得到什麼錯誤?對我來說似乎沒問題。 –

+0

你見過這個:http://yaseminavcular.blogspot.com/2011/03/many-small-input-files.html –

+0

@Charles,謝謝,我得到的錯誤是「錯誤:MyKeyValueLineRecordReader不是抽象的不要在RecordReader中覆蓋抽象方法next(Object,Object)「 –

回答

14

這裏是我對你的實現:

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.InputSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.LineRecordReader; 
import org.apache.hadoop.mapred.RecordReader; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.lib.CombineFileInputFormat; 
import org.apache.hadoop.mapred.lib.CombineFileRecordReader; 
import org.apache.hadoop.mapred.lib.CombineFileSplit; 

@SuppressWarnings("deprecation") 
public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> { 

    @SuppressWarnings({ "unchecked", "rawtypes" }) 
    @Override 
    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException { 

     return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class); 
    } 

    public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> { 
     private final LineRecordReader linerecord; 

     public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException { 
      FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations()); 
      linerecord = new LineRecordReader(conf, filesplit); 
     } 

     @Override 
     public void close() throws IOException { 
      linerecord.close(); 

     } 

     @Override 
     public LongWritable createKey() { 
      // TODO Auto-generated method stub 
      return linerecord.createKey(); 
     } 

     @Override 
     public Text createValue() { 
      // TODO Auto-generated method stub 
      return linerecord.createValue(); 
     } 

     @Override 
     public long getPos() throws IOException { 
      // TODO Auto-generated method stub 
      return linerecord.getPos(); 
     } 

     @Override 
     public float getProgress() throws IOException { 
      // TODO Auto-generated method stub 
      return linerecord.getProgress(); 
     } 

     @Override 
     public boolean next(LongWritable key, Text value) throws IOException { 

      // TODO Auto-generated method stub 
      return linerecord.next(key, value); 
     } 

    } 
} 

在你的工作,首先設置參數mapred.max.split.size根據你想輸入的文件組合成規模。做一些你運行類似如下()

... 
      if (argument != null) { 
       conf.set("mapred.max.split.size", argument); 
      } else { 
       conf.set("mapred.max.split.size", "134217728"); // 128 MB 
      } 
... 

      conf.setInputFormat(CombinedInputFormat.class); 
... 
+0

整潔,編譯沒問題,謝謝Amar! –

+0

嗨@Amar,索引從哪裏來? – learninghuman

+0

@ManikandanKannan:使用此InputFormat時,您無需擔心索引。爲了使用它,只需執行'conf.setInputFormat(CombinedInputFormat.class);'如上所示。但FYI指數可能是在通過hadoop在內部初始化記錄讀取器時傳遞的拆分數字。 – Amar