2012-09-14 112 views
0

我想解決以下RecordReader問題。 例輸入文件:Hadoop Map-Reduce。 RecordReader

1,1 
2,2 
3,3 
4,4 
5,5 
6,6 
7,7 
....... 
....... 

我希望我的RecordReader返回

key | Value 
0 |1,1:2,2:3,3:4,4:5,5 
4 |2,2:3,3:......6,6 
6 |3,3:4,4......6,6,7,7 

(對於第一個值前五線,爲第二個值五名來自第二行開始行和第3個值從開始的五線第三線等)

public class MyRecordReader extends RecordReader<LongWritable, Text> {

@Override 
public boolean nextKeyValue() throws IOException, InterruptedException { 

    while (pos < end) { 
     key.set(pos); 
     // five line logic 
     Text nextLine=new Text(); 



     int newSize = in.readLine(value, maxLineLength, 
           Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), 
             maxLineLength)); 
     fileSeek+=newSize; 

     for(int n=0;n<4;n++) 
     { 
      fileSeek+=in.readLine(nextLine, maxLineLength, 
        Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), 
          maxLineLength)); 
      value.append(":".getBytes(), 0,1); 
      value.append(nextLine.getBytes(), 0, nextLine.getLength()); 
     } 
     if (newSize == 0) { 

     return false; 

     } 
     pos += newSize; 
     if (newSize < maxLineLength) { 

     return true; 
     } 

     // line too long. try again 
     LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); 
    } 

    return false; 
} 

}

但是,這是返回值

key | Value 
0 |1,1:2,2:3,3:4,4:5,5 
4 |6,6:7,7.......10,10 
6 |11,11:12,12:......14,14 

有人可以幫助我這個代碼或新代碼RecodeReader會做呢? Requirement of the problem (may help you understand the use case) 感謝

+2

請格式化問題正確的,並正確顯示輸出,那麼我們可能會回答... – codeling

+0

@nyarlathotep:對格式不好的sry。 我試圖改善它, 仍然可以幫助我機智的回答。 –

回答

3

我想我明白這個問題...這裏是我會做什麼:包裝其他RecordReader和緩衝來自它的鍵/值轉換爲本地隊列。

public class MyRecordReader extends RecordReader<LongWritable, Text> { 
    private static final int BUFFER_SIZE = 5; 
    private static final String DELIMITER = ":"; 

    private Queue<String> valueBuffer = new LinkedList<String>(); 
    private Queue<Long> keyBuffer = new LinkedList<Long>(); 
    private LongWritable key = new LongWritable(); 
    private Text value = new Text(); 

    private RecordReader<LongWritable, Text> rr; 
    public MyRecordReader(RecordReader<LongWritable, Text> rr) { 
     this.rr = rr; 
    } 

    @Override 
    public void close() throws IOException { 
     rr.close(); 
    } 

    @Override 
    public LongWritable getCurrentKey() throws IOException, InterruptedException { 
     return key; 
    } 

    @Override 
    public Text getCurrentValue() throws IOException, InterruptedException { 
     return value; 
    } 

    @Override 
    public float getProgress() throws IOException, InterruptedException { 
     return rr.getProgress(); 
    } 

    @Override 
    public void initialize(InputSplit arg0, TaskAttemptContext arg1) 
      throws IOException, InterruptedException { 
     rr.initialize(arg0, arg1); 
    } 

    @Override 
    public boolean nextKeyValue() throws IOException, InterruptedException { 
     if (valueBuffer.isEmpty()) { 
      while (valueBuffer.size() < BUFFER_SIZE) { 
       if (rr.nextKeyValue()) { 
        keyBuffer.add(rr.getCurrentKey().get()); 
        valueBuffer.add(rr.getCurrentValue().toString()); 
       } else { 
        return false; 
       } 
      } 
     } else { 
      if (rr.nextKeyValue()) { 
       keyBuffer.add(rr.getCurrentKey().get()); 
       valueBuffer.add(rr.getCurrentValue().toString()); 
       keyBuffer.remove(); 
       valueBuffer.remove(); 
      } else { 
       return false; 
      } 
     } 
     key.set(keyBuffer.peek()); 
     value.set(getValue()); 
     return true; 
    } 

    private String getValue() { 
     StringBuilder sb = new StringBuilder(); 
     Iterator<String> iter = valueBuffer.iterator(); 
     while (iter.hasNext()) { 
      sb.append(iter.next()); 
      if (iter.hasNext()) sb.append(DELIMITER); 
     } 
     return sb.toString(); 
    } 

} 

再比如說,你可以有一個擴展的TextInputFormat和覆蓋createRecordReader方法來調用super.createRecordReader並返回結果包裹在一個MyRecordReader,像這樣的自定義InputFormat:

public class MyTextInputFormat extends TextInputFormat { 
    @Override 
    public RecordReader<LongWritable, Text> createRecordReader(
       InputSplit arg0, TaskAttemptContext arg1) { 
     return new MyRecordReader(super.createRecordReader(arg0, arg1)); 
    } 
} 
+0

對不起,在運行它之前,我沒有測試過代碼。我編輯它,現在嘗試。 –

+0

謝謝@joe K:它的工作完美。 –