2013-03-25 52 views
6

重寫RecordReader類的方法「下一步」和TextInputFormat類的「getRecordReader」以便發送整個段落到映射器而不是逐行。 (我用舊的API和認定中對我的款追加至一個空行來在我的文本文件的時間。)
下面是我的代碼:覆蓋RecordReader一次而不是行

public class NLinesInputFormat extends TextInputFormat 
{ 
    @Override 
    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter)throws IOException  { 
     reporter.setStatus(split.toString()); 
     return new ParagraphRecordReader(conf, (FileSplit)split); 
    } 
} 



public class ParagraphRecordReader implements RecordReader<LongWritable, Text> 
{ 
     private LineRecordReader lineRecord; 
     private LongWritable lineKey; 
     private Text lineValue; 
     public ParagraphRecordReader(JobConf conf, FileSplit split) throws IOException { 
      lineRecord = new LineRecordReader(conf, split); 
      lineKey = lineRecord.createKey(); 
      lineValue = lineRecord.createValue(); 
     } 

     @Override 
     public void close() throws IOException { 
      lineRecord.close(); 
     } 

     @Override 
     public LongWritable createKey() { 
      return new LongWritable(); 

     } 

     @Override 
     public Text createValue() { 
      return new Text(""); 

     } 

     @Override 
     public float getProgress() throws IOException { 
      return lineRecord.getPos(); 

     } 

     @Override 
     public synchronized boolean next(LongWritable key, Text value) throws IOException { 
      boolean appended, gotsomething; 
      boolean retval; 
      byte space[] = {' '}; 
      value.clear(); 
      gotsomething = false; 
      do { 
       appended = false; 
       retval = lineRecord.next(lineKey, lineValue); 
       if (retval) { 
        if (lineValue.toString().length() > 0) { 
         byte[] rawline = lineValue.getBytes(); 
         int rawlinelen = lineValue.getLength(); 
         value.append(rawline, 0, rawlinelen); 
         value.append(space, 0, 1); 
         appended = true; 
        } 
        gotsomething = true; 
       } 
      } while (appended); 

      //System.out.println("ParagraphRecordReader::next() returns "+gotsomething+" after setting value to: ["+value.toString()+"]"); 
      return gotsomething; 
     } 

     @Override 
     public long getPos() throws IOException { 
      return lineRecord.getPos(); 
     } 
    } 

問題:
1.我沒有找到任何具體的指導如何做到這一點,所以可能是我做錯了,請評論任何建議?
2.我能夠正確編譯,但是當我運行我的作業時,我的映射器不斷運行,我無法弄清楚問題出在哪裏?

+0

您是否嘗試過僅輸入一個段落? – Amar 2013-03-25 09:02:21

+0

我認爲你有一個bug;當你穿越分裂時你會得到額外的段落。我認爲你需要區分從0開始的分割和其他分割。從0開始的第一行開始一段,但以行開頭的分割不應該開始一個新段落。 (通常情況下,你已經讀過一個拆分邊界,所以如果你的拆分文件有連續段落的行,它們將會被前一個拆分文件所發出)。我錯過了什麼嗎? – 2017-04-15 22:10:56

回答

3

你的代碼對我來說工作得很好。 我做的唯一改變是將這些類作爲內部類並使它們變爲靜態。

輸入文件如下:

This is awesome. 
WTF is this. 

This is just a test. 

映射器的代碼看起來像:

@Override 
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) 
    throws IOException { 

    System.out.println(key+" : "+value); 
} 

而且輸出是:

0 : This is awesome. WTF is this. 
0 : This is just a test. 

我相信你會避風港忘記設置輸入格式,但爲了以防萬一,請將其設置爲以下內容ws:

conf.setInputFormat(NLinesInputFormat.class); 
+0

感謝您回覆Amar!..我使用這些類作爲公共靜態,並設置了Inputformat,但我沒有嘗試使用小段落,我正在用一個大文件進行測試。我會這樣做,讓你知道它是如何發生的。 – JackSparrow 2013-03-25 14:06:38

+0

嘿謝謝人......我檢查了短輸入文件,它對長文件工作正常這是一些格式問題,我已經弄明白了! – JackSparrow 2013-03-25 17:15:00

+0

@Amar是一個hadoop的初學者,你能解釋下一個方法內部發生了什麼嗎?你能解釋我實現的邏輯嗎?我在這方面需要一點幫助。 – user1585111 2013-09-02 10:53:19