2012-11-12 73 views
2

我有一個巨大的文本文件,我想分割文件,使每個塊有5行。我實現了我自己的GWASInputFormat和GWASRecordReader類。但是我的問題是,在下面的代碼(這是我從http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/複製),我有以下行Hadoop Map Reduce CustomSplit/CustomRecordReader

FileSplit split = (FileSplit) genericSplit; 
final Path file = split.getPath(); 
Configuration conf = context.getConfiguration(); 

我的問題是,該文件已經由當時的初始化(拆分initialize()方法中)方法在我的GWASRecordReader類中調用?我認爲我在GWASRecordReader類中做了它(拆分)。讓我知道我的思維過程是否正確。

package com.test; 

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.util.LineReader; 

public class GWASRecordReader extends RecordReader<LongWritable, Text> { 

private final int NLINESTOPROCESS = 5; 
private LineReader in; 
private LongWritable key; 
private Text value = new Text(); 
private long start = 0; 
private long pos = 0; 
private long end = 0; 
private int maxLineLength; 

public void close() throws IOException { 
    if(in != null) { 
     in.close(); 
    } 
} 

public LongWritable getCurrentKey() throws IOException, InterruptedException { 
    return key; 
} 

public Text getCurrentValue() throws IOException, InterruptedException { 
    return value; 
} 

public float getProgress() throws IOException, InterruptedException { 
    if(start == end) { 
     return 0.0f; 
    } 
    else { 
     return Math.min(1.0f, (pos - start)/(float) (end - start)); 
    } 
} 

public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { 
    FileSplit split = (FileSplit) genericSplit; 
    final Path file = split.getPath(); 
    Configuration conf = context.getConfiguration(); 
    this.maxLineLength = conf.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE); 
    FileSystem fs = file.getFileSystem(conf); 
    start = split.getStart(); 
    end = start + split.getLength(); 
    System.out.println("---------------SPLIT LENGTH---------------------" + split.getLength()); 
    boolean skipFirstLine = false; 
    FSDataInputStream filein = fs.open(split.getPath()); 

    if(start != 0) { 
     skipFirstLine = true; 
     --start; 
     filein.seek(start); 
    } 

    in = new LineReader(filein, conf); 
    if(skipFirstLine) { 
     start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start)); 
    } 
    this.pos = start; 
} 

public boolean nextKeyValue() throws IOException, InterruptedException { 
    if (key == null) { 
     key = new LongWritable(); 
    } 

    key.set(pos); 

    if (value == null) { 
     value = new Text(); 
    } 
    value.clear(); 
    final Text endline = new Text("\n"); 
    int newSize = 0; 
    for(int i=0; i<NLINESTOPROCESS;i++) { 
     Text v = new Text(); 
     while(pos < end) { 
      newSize = in.readLine(v ,maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); 
      value.append(v.getBytes(), 0, v.getLength()); 
      value.append(endline.getBytes(),0,endline.getLength()); 
      if(newSize == 0) { 
       break; 
      } 
      pos += newSize; 
      if(newSize < maxLineLength) { 
       break; 
      } 
     } 
    } 

    if(newSize == 0) { 
     key = null; 
     value = null; 
     return false; 
    } else { 
     return true; 
    } 
} 
} 

回答

7

是的,輸入文件已經被拆分。它基本上是這樣的:

your input file(s) -> InputSplit -> RecordReader -> Mapper...

基本上,InputSplit斷輸入成塊,RecordReader中斷這些塊成鍵/值對。請注意,InputSplitRecordReader將由您使用的InputFormat確定。例如,TextInputFormat使用FileSplit來拆分輸入,然後使用LineRecordReader來處理每個單獨的行,並將該位置作爲關鍵字,並將行本身作爲值。 因此,在您的GWASInputFormat中,您需要查看您使用什麼樣的FileSplit以查看它傳遞給GWASRecordReader的內容。

我建議看看NLineInputFormat哪個「將N行輸入拆分爲一個拆分」。它可能能夠完全做你正在努力做的事情。

如果你想在時間的價值得到5行,第一個作爲關鍵的行號,我會說你可以用定製的NLineInputFormat和定製LineRecordReader做到這一點。你不需要擔心我認爲的輸入分割,因爲輸入格式可以將它分割成5行。您的RecordReaderLineRecordReader非常相似,但不是獲取塊開始的字節位置,而是獲取行號。所以代碼將幾乎完全相同,除了那個小的改變。所以你基本上可以複製並粘貼NLineInputFormatLineRecordReader,但是輸入格式會使用你的記錄閱讀器來獲取行號。代碼將非常相似。

+0

非常感謝。這清除了一些事情。我想跟蹤輸入文件的行號,並將行號和輸入記錄一起作爲值映射到映射器。所以看起來我必須使用自己的Split,因爲我現在正在做的方式已經分割了文件。你可以讓我知道我有什麼選擇(我想我需要重寫computeSplitSize()方法)。我搜索了所有的網頁,但沒有找到一個具體的答案,如果我們可以做到或沒有 – user1707141

+0

@ user1707141我更新了我的答案,以解決這個問題。讓我知道它是否合理,或者我需要更好地解釋。 –

+0

這個答案的第一部分保存了我的生活,我有一個類似的問題,我明白我的關注將是RecordReader。我想找到由字符串分隔的記錄,所以我去了,發現了這篇很棒的文章。我知道這是一個遲到的答案,但它可能是有用的需要此人的任何人:http://hadoopi.wordpress.com/2013/05/31/custom-recordreader-processing-string-pattern-delimited-records/ –