2014-05-01 18 views
0

請稍微輕鬆一點,因爲我在Hadoop和Mapreduce中只有3個月的時間。如何在並行運行兩個地圖任務的情況下並行讀取兩個文件

我有2個文件每個120 MB,每個文件內的數據是完全非結構化的,但有一個共同的模式。由於數據結構不同,我的要求不能通過默認的LineInputFormat來滿足。

因此,在讀取文件時,我重寫了isSplitable()方法並通過返回false來停止拆分。因此1個映射器可以訪問一個完整的文件,我可以執行我的邏輯並達到要求。

我的機器可以並行運行兩個映射器,所以通過停止拆分,我通過對每個文件逐個運行映射器而不是爲一個文件並行運行兩個映射器來降低性能。

我的問題是如何爲兩個文件並行運行兩個映射器,以提高性能。

對於實施例

When split was allowed: 
    file 1: split 1 (1st mapper) || split 2 (2nd mapper)------ 2 min 
    file 2: split 1 (1st mapper) || split 2 (2nd mapper)------ 2 min 

    Total Time for reading two files ===== 4 min 

When Split not allowed: 
    file 1: no parallel jobs so (1st mapper)---------4 min 
    file 2: no parallel jobs so (1st mapper)---------4 min 

    Total Time to read two files ===== 8 min (Performance degraded) 

What I want 
    File 1 (1st Mapper) || file 2 (2nd Mapper) ------4 min 

    Total time to read two files ====== 4 min 

基本上我想這兩個文件以在同一時間由兩個不同的映射器讀取。

請幫我實現這個場景。

下面是我的Custom InputFormat和Custom RecordReader Code。

public class NSI_inputformatter extends FileInputFormat<NullWritable, Text>{ 
@Override 
public boolean isSplitable(FileSystem fs, Path filename) 
{ 
    //System.out.println("Inside the isSplitable Method of NSI_inputformatter"); 
    return false; 
} 

@Override 
public RecordReader<NullWritable, Text> getRecordReader(InputSplit split, 
     JobConf job_run, Reporter reporter) throws IOException { 
    // TODO Auto-generated method stub 
    //System.out.println("Inside the getRecordReader method of NSI_inputformatter"); 

    return new NSI_record_reader(job_run, (FileSplit)split); 
} 

} 

記錄讀者:

public class NSI_record_reader implements RecordReader<NullWritable, Text> 
{ 
FileSplit split; 
JobConf job_run; 
String text; 
public boolean processed=false; 
public NSI_record_reader(JobConf job_run, FileSplit split) 
{ 
    //System.out.println("Inside the NSI_record_reader constructor"); 
    this.split=split; 
    this.job_run=job_run; 

    //System.out.println(split.toString()); 
} 
@Override 
public boolean next(NullWritable key, Text value) throws IOException { 
    // TODO Auto-generated method stub 
    //System.out.println("Inside the next method of the NLI_record_reader"); 
    if (!processed) 
    { 
     byte [] content_add=new byte[(int)(split.getLength())]; 
     Path file=split.getPath(); 
     FileSystem fs=file.getFileSystem(job_run); 
     FSDataInputStream input=null; 


     try{ 
      input=fs.open(file); 
      System.out.println("the input is " +input+ input.toString()); 
      IOUtils.readFully(input, content_add, 0, content_add.length); 
      value.set(content_add, 0, content_add.length); 
     } 
     finally 
     { 
      IOUtils.closeStream(input); 

     } 
     processed=true; 
     return true; 
    } 

    return false; 
} 

@Override 
public void close() throws IOException { 
    // TODO Auto-generated method stub 

} 

@Override 
public NullWritable createKey() { 
    System.out.println("Inside createkey() mrthod of NSI_record_reader"); 
    // TODO Auto-generated method stub 
    return NullWritable.get(); 
} 

@Override 
public Text createValue() { 
    System.out.println("Inside createValue() mrthod of NSI_record_reader"); 
    // TODO Auto-generated method stub 
    return new Text(); 
} 

@Override 
public long getPos() throws IOException { 
    // TODO Auto-generated method stub 
    System.out.println("Inside getPs() mrthod of NSI_record_reader"); 
    return processed ? split.getLength() : 0; 
} 

@Override 
public float getProgress() throws IOException { 
    // TODO Auto-generated method stub 
    System.out.println("Inside getProgress() mrthod of NSI_record_reader"); 
    return processed ? 1.0f : 0.0f; 
} 

} 

輸入採樣:

<Dec 12, 2013 1:05:56 AM CST> <Error> <HTTP> <BEA-101017>  <[[email protected] - appName: 'Agile', name: '/Agile', context-path: '/Agile', spec-version: 'null'] Root cause of ServletException. 
    javax.servlet.jsp.JspException: Connection reset by peer: socket write error 
at com.agile.ui.web.taglib.common.FormTag.writeFormHeader(FormTag.java:498) 
at com.agile.ui.web.taglib.common.FormTag.doStartTag(FormTag.java:429) 
at jsp_servlet._default.__login_45_cms._jspService(__login_45_cms.java:929) 
at weblogic.servlet.jsp.JspBase.service(JspBase.java:34) 
at weblogic.servlet.internal.StubSecurityHelper$ServletServiceAction.run(StubSecurityHelper.ja va:227) 
Truncated. see log file for complete stacktrace 
> 
Retrieving the value for the attribute Page Two.Validation Status for the Object 769630 
Retrieving the value for the attribute Page Two.Pilot Required for the Object 769630 
Retrieving the value for the attribute Page Two.NPO Contact for the Object 769630 
<Dec 12, 2013 1:12:13 AM CST> <Warning> <Socket> <BEA-000449> <Closing socket as no   data read from it during the configured idle timeout of 0 secs> 

感謝。

+0

你有沒有考慮過使用自定義[inputformat](https://developer.yahoo.com/hadoop/tutorial/module5.html#fileformat)?你可以粘貼一個你的輸入文件的樣本,這將有助於決定一個自定義的輸入格式是否會爲你做出訣竅。 – Sudarshan

+0

在這裏你去我已經分享我的自定義inputformat和自定義recordreader代碼與輸入文件的一小塊 – Sam

+0

我已經構築了一個自定義InputFormat停止拆分,因此我的完整文件可以被一個映射器讀取。 – Sam

回答

1

您可以嘗試設置屬性-D mapred.min.split.size=209715200。在這種情況下,FileInputFormat不應拆分文件,因爲它們小於mapred.min.split.size

+0

但是這意味着我們仍然只有一個映射器處理輸入,不是嗎? – Sudarshan

+0

從[FileInputSplit source](http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/) hadoop/mapreduce/lib/input/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapreduce.JobContext%29)如果你有兩個文件,那麼FileInputFormat會創建至少兩個拆分文件(每個文件一個)並且每個分割都將使用單獨的映射器進行處理。 –

+0

感謝您指出了這一點,然而,只要使用FileInputSplit中的代碼,我們應該有2個映射器運行,每個輸入文件對應一個映射器? ... – Sudarshan