請稍微輕鬆一點,因爲我在Hadoop和Mapreduce中只有3個月的時間。如何在並行運行兩個地圖任務的情況下並行讀取兩個文件
我有2個文件每個120 MB,每個文件內的數據是完全非結構化的,但有一個共同的模式。由於數據結構不同,我的要求不能通過默認的LineInputFormat來滿足。
因此,在讀取文件時,我重寫了isSplitable()方法並通過返回false來停止拆分。因此1個映射器可以訪問一個完整的文件,我可以執行我的邏輯並達到要求。
我的機器可以並行運行兩個映射器,所以通過停止拆分,我通過對每個文件逐個運行映射器而不是爲一個文件並行運行兩個映射器來降低性能。
我的問題是如何爲兩個文件並行運行兩個映射器,以提高性能。
對於實施例
When split was allowed:
file 1: split 1 (1st mapper) || split 2 (2nd mapper)------ 2 min
file 2: split 1 (1st mapper) || split 2 (2nd mapper)------ 2 min
Total Time for reading two files ===== 4 min
When Split not allowed:
file 1: no parallel jobs so (1st mapper)---------4 min
file 2: no parallel jobs so (1st mapper)---------4 min
Total Time to read two files ===== 8 min (Performance degraded)
What I want
File 1 (1st Mapper) || file 2 (2nd Mapper) ------4 min
Total time to read two files ====== 4 min
基本上我想這兩個文件以在同一時間由兩個不同的映射器讀取。
請幫我實現這個場景。
下面是我的Custom InputFormat和Custom RecordReader Code。
public class NSI_inputformatter extends FileInputFormat<NullWritable, Text>{
@Override
public boolean isSplitable(FileSystem fs, Path filename)
{
//System.out.println("Inside the isSplitable Method of NSI_inputformatter");
return false;
}
@Override
public RecordReader<NullWritable, Text> getRecordReader(InputSplit split,
JobConf job_run, Reporter reporter) throws IOException {
// TODO Auto-generated method stub
//System.out.println("Inside the getRecordReader method of NSI_inputformatter");
return new NSI_record_reader(job_run, (FileSplit)split);
}
}
記錄讀者:
public class NSI_record_reader implements RecordReader<NullWritable, Text>
{
FileSplit split;
JobConf job_run;
String text;
public boolean processed=false;
public NSI_record_reader(JobConf job_run, FileSplit split)
{
//System.out.println("Inside the NSI_record_reader constructor");
this.split=split;
this.job_run=job_run;
//System.out.println(split.toString());
}
@Override
public boolean next(NullWritable key, Text value) throws IOException {
// TODO Auto-generated method stub
//System.out.println("Inside the next method of the NLI_record_reader");
if (!processed)
{
byte [] content_add=new byte[(int)(split.getLength())];
Path file=split.getPath();
FileSystem fs=file.getFileSystem(job_run);
FSDataInputStream input=null;
try{
input=fs.open(file);
System.out.println("the input is " +input+ input.toString());
IOUtils.readFully(input, content_add, 0, content_add.length);
value.set(content_add, 0, content_add.length);
}
finally
{
IOUtils.closeStream(input);
}
processed=true;
return true;
}
return false;
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
@Override
public NullWritable createKey() {
System.out.println("Inside createkey() mrthod of NSI_record_reader");
// TODO Auto-generated method stub
return NullWritable.get();
}
@Override
public Text createValue() {
System.out.println("Inside createValue() mrthod of NSI_record_reader");
// TODO Auto-generated method stub
return new Text();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
System.out.println("Inside getPs() mrthod of NSI_record_reader");
return processed ? split.getLength() : 0;
}
@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
System.out.println("Inside getProgress() mrthod of NSI_record_reader");
return processed ? 1.0f : 0.0f;
}
}
輸入採樣:
<Dec 12, 2013 1:05:56 AM CST> <Error> <HTTP> <BEA-101017> <[[email protected] - appName: 'Agile', name: '/Agile', context-path: '/Agile', spec-version: 'null'] Root cause of ServletException.
javax.servlet.jsp.JspException: Connection reset by peer: socket write error
at com.agile.ui.web.taglib.common.FormTag.writeFormHeader(FormTag.java:498)
at com.agile.ui.web.taglib.common.FormTag.doStartTag(FormTag.java:429)
at jsp_servlet._default.__login_45_cms._jspService(__login_45_cms.java:929)
at weblogic.servlet.jsp.JspBase.service(JspBase.java:34)
at weblogic.servlet.internal.StubSecurityHelper$ServletServiceAction.run(StubSecurityHelper.ja va:227)
Truncated. see log file for complete stacktrace
>
Retrieving the value for the attribute Page Two.Validation Status for the Object 769630
Retrieving the value for the attribute Page Two.Pilot Required for the Object 769630
Retrieving the value for the attribute Page Two.NPO Contact for the Object 769630
<Dec 12, 2013 1:12:13 AM CST> <Warning> <Socket> <BEA-000449> <Closing socket as no data read from it during the configured idle timeout of 0 secs>
感謝。
你有沒有考慮過使用自定義[inputformat](https://developer.yahoo.com/hadoop/tutorial/module5.html#fileformat)?你可以粘貼一個你的輸入文件的樣本,這將有助於決定一個自定義的輸入格式是否會爲你做出訣竅。 – Sudarshan
在這裏你去我已經分享我的自定義inputformat和自定義recordreader代碼與輸入文件的一小塊 – Sam
我已經構築了一個自定義InputFormat停止拆分,因此我的完整文件可以被一個映射器讀取。 – Sam