我有2個文件每個120 MB,每個文件內的數據是完全非結構化的,但有一個共同的模式。由於數據結構不同,我的要求不能通過默認的LineInputFormat來滿足。





When split was allowed: 
    file 1: split 1 (1st mapper) || split 2 (2nd mapper)------ 2 min 
    file 2: split 1 (1st mapper) || split 2 (2nd mapper)------ 2 min 

    Total Time for reading two files ===== 4 min 

When Split not allowed: 
    file 1: no parallel jobs so (1st mapper)---------4 min 
    file 2: no parallel jobs so (1st mapper)---------4 min 

    Total Time to read two files ===== 8 min (Performance degraded) 

What I want 
    File 1 (1st Mapper) || file 2 (2nd Mapper) ------4 min 

    Total time to read two files ====== 4 min 



下面是我的Custom InputFormat和Custom RecordReader Code。

public class NSI_inputformatter extends FileInputFormat<NullWritable, Text>{ 
public boolean isSplitable(FileSystem fs, Path filename) 
    //System.out.println("Inside the isSplitable Method of NSI_inputformatter"); 
    return false; 

public RecordReader<NullWritable, Text> getRecordReader(InputSplit split, 
     JobConf job_run, Reporter reporter) throws IOException { 
    // TODO Auto-generated method stub 
    //System.out.println("Inside the getRecordReader method of NSI_inputformatter"); 

    return new NSI_record_reader(job_run, (FileSplit)split); 



public class NSI_record_reader implements RecordReader<NullWritable, Text> 
FileSplit split; 
JobConf job_run; 
String text; 
public boolean processed=false; 
public NSI_record_reader(JobConf job_run, FileSplit split) 
    //System.out.println("Inside the NSI_record_reader constructor"); 

public boolean next(NullWritable key, Text value) throws IOException { 
    // TODO Auto-generated method stub 
    //System.out.println("Inside the next method of the NLI_record_reader"); 
    if (!processed) 
     byte [] content_add=new byte[(int)(split.getLength())]; 
     Path file=split.getPath(); 
     FileSystem fs=file.getFileSystem(job_run); 
     FSDataInputStream input=null; 

      System.out.println("the input is " +input+ input.toString()); 
      IOUtils.readFully(input, content_add, 0, content_add.length); 
      value.set(content_add, 0, content_add.length); 

     return true; 

    return false; 

public void close() throws IOException { 
    // TODO Auto-generated method stub 


public NullWritable createKey() { 
    System.out.println("Inside createkey() mrthod of NSI_record_reader"); 
    // TODO Auto-generated method stub 
    return NullWritable.get(); 

public Text createValue() { 
    System.out.println("Inside createValue() mrthod of NSI_record_reader"); 
    // TODO Auto-generated method stub 
    return new Text(); 

public long getPos() throws IOException { 
    // TODO Auto-generated method stub 
    System.out.println("Inside getPs() mrthod of NSI_record_reader"); 
    return processed ? split.getLength() : 0; 

public float getProgress() throws IOException { 
    // TODO Auto-generated method stub 
    System.out.println("Inside getProgress() mrthod of NSI_record_reader"); 
    return processed ? 1.0f : 0.0f; 



你有沒有考慮過使用自定義[inputformat](https://developer.yahoo.com/hadoop/tutorial/module5.html#fileformat)?你可以粘貼一個你的輸入文件的樣本,這將有助於決定一個自定義的輸入格式是否會爲你做出訣竅。 – Sudarshan


在這裏你去我已經分享我的自定義inputformat和自定義recordreader代碼與輸入文件的一小塊 – Sam


我已經構築了一個自定義InputFormat停止拆分,因此我的完整文件可以被一個映射器讀取。 – Sam



您可以嘗試設置屬性-D mapred.min.split.size=209715200。在這種情況下,FileInputFormat不應拆分文件,因爲它們小於mapred.min.split.size


但是這意味着我們仍然只有一個映射器處理輸入,不是嗎? – Sudarshan


從[FileInputSplit source](http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/) hadoop/mapreduce/lib/input/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapreduce.JobContext%29)如果你有兩個文件,那麼FileInputFormat會創建至少兩個拆分文件(每個文件一個)並且每個分割都將使用單獨的映射器進行處理。 –


感謝您指出了這一點,然而,只要使用FileInputSplit中的代碼,我們應該有2個映射器運行,每個輸入文件對應一個映射器? ... – Sudarshan