我有一個巨大的文本文件,我想分割文件,使每個塊有5行。我實現了我自己的GWASInputFormat和GWASRecordReader類。但是我的問題是,在下面的代碼(這是我從http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/複製),我有以下行Hadoop Map Reduce CustomSplit/CustomRecordReader
FileSplit split = (FileSplit) genericSplit;
final Path file = split.getPath();
Configuration conf = context.getConfiguration();
我的問題是,該文件已經由當時的初始化(拆分initialize()方法中)方法在我的GWASRecordReader類中調用?我認爲我在GWASRecordReader類中做了它(拆分)。讓我知道我的思維過程是否正確。
package com.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
public class GWASRecordReader extends RecordReader<LongWritable, Text> {
private final int NLINESTOPROCESS = 5;
private LineReader in;
private LongWritable key;
private Text value = new Text();
private long start = 0;
private long pos = 0;
private long end = 0;
private int maxLineLength;
public void close() throws IOException {
if(in != null) {
in.close();
}
}
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
public float getProgress() throws IOException, InterruptedException {
if(start == end) {
return 0.0f;
}
else {
return Math.min(1.0f, (pos - start)/(float) (end - start));
}
}
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
final Path file = split.getPath();
Configuration conf = context.getConfiguration();
this.maxLineLength = conf.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
FileSystem fs = file.getFileSystem(conf);
start = split.getStart();
end = start + split.getLength();
System.out.println("---------------SPLIT LENGTH---------------------" + split.getLength());
boolean skipFirstLine = false;
FSDataInputStream filein = fs.open(split.getPath());
if(start != 0) {
skipFirstLine = true;
--start;
filein.seek(start);
}
in = new LineReader(filein, conf);
if(skipFirstLine) {
start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
value.clear();
final Text endline = new Text("\n");
int newSize = 0;
for(int i=0; i<NLINESTOPROCESS;i++) {
Text v = new Text();
while(pos < end) {
newSize = in.readLine(v ,maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
value.append(v.getBytes(), 0, v.getLength());
value.append(endline.getBytes(),0,endline.getLength());
if(newSize == 0) {
break;
}
pos += newSize;
if(newSize < maxLineLength) {
break;
}
}
}
if(newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
}
非常感謝。這清除了一些事情。我想跟蹤輸入文件的行號,並將行號和輸入記錄一起作爲值映射到映射器。所以看起來我必須使用自己的Split,因爲我現在正在做的方式已經分割了文件。你可以讓我知道我有什麼選擇(我想我需要重寫computeSplitSize()方法)。我搜索了所有的網頁,但沒有找到一個具體的答案,如果我們可以做到或沒有 – user1707141
@ user1707141我更新了我的答案,以解決這個問題。讓我知道它是否合理,或者我需要更好地解釋。 –
這個答案的第一部分保存了我的生活,我有一個類似的問題,我明白我的關注將是RecordReader。我想找到由字符串分隔的記錄,所以我去了,發現了這篇很棒的文章。我知道這是一個遲到的答案,但它可能是有用的需要此人的任何人:http://hadoopi.wordpress.com/2013/05/31/custom-recordreader-processing-string-pattern-delimited-records/ –