我想使用Hadoop 0.20.0/0.20.2的CombineFileInputFormat,這樣它就可以爲每個記錄處理1個文件,而且不會影響數據 - 局部性(通常需要處理)。如何在Hadoop中使用CombineFileInputFormat?
它在湯姆懷特的Hadoop權威指南中提到,但他沒有展示如何去做。相反,他轉向序列文件。
我對記錄閱讀器中處理變量的含義非常困惑。任何代碼示例都會有很大的幫助。
預先感謝..
我想使用Hadoop 0.20.0/0.20.2的CombineFileInputFormat,這樣它就可以爲每個記錄處理1個文件,而且不會影響數據 - 局部性(通常需要處理)。如何在Hadoop中使用CombineFileInputFormat?
它在湯姆懷特的Hadoop權威指南中提到,但他沒有展示如何去做。相反,他轉向序列文件。
我對記錄閱讀器中處理變量的含義非常困惑。任何代碼示例都會有很大的幫助。
預先感謝..
檢查用於組合文件的輸入格式的下面輸入格式。
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
/**
* CustomInputformat which implements the createRecordReader of abstract class CombineFileInputFormat
*/
public class MyCombineFileInputFormat extends CombineFileInputFormat {
public static class MyRecordReader extends RecordReader<LongWritable,Text>{
private LineRecordReader delegate=null;
private int idx;
public MyRecordReader(CombineFileSplit split,TaskAttemptContext taskcontext ,Integer idx) throws IOException {
this.idx=idx;
delegate = new LineRecordReader();
}
@Override
public void close() throws IOException {
delegate.close();
}
@Override
public float getProgress() {
try {
return delegate.getProgress();
}
catch(Exception e) {
return 0;
}
}
@Override
public void initialize(InputSplit split, TaskAttemptContext taskcontext) throws IOException {
CombineFileSplit csplit=(CombineFileSplit)split;
FileSplit fileSplit = new FileSplit(csplit.getPath(idx), csplit.getOffset(idx), csplit.getLength(idx), csplit.getLocations());
delegate.initialize(fileSplit, taskcontext);
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return delegate.getCurrentKey();
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return delegate.getCurrentValue();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return delegate.nextKeyValue();
}
}
@SuppressWarnings("unchecked")
@Override
public RecordReader createRecordReader(InputSplit split,TaskAttemptContext taskcontext) throws IOException {
return new CombineFileRecordReader((CombineFileSplit) split, taskcontext, MyRecordReader.class);
}
}
以下是從所謂的「新API」中使用CombineFileInputFormat的最簡單方法。假設您的實際輸入格式爲MyFormat,並將其與的myKey的按鍵和myvalue的的價值工程(可能是SequenceFileInputFormat< MyKey, MyValue >
一些子類,例如)。
public class CombinedMyFormat extends CombineFileInputFormat< MyKey, MyValue > {
// exists merely to fix the key/value types and
// inject the delegate format to the superclass
// if MyFormat does not use state, consider a constant instead
private static class CombineMyKeyMyValueReaderWrapper
extends CombineFileRecordReaderWrapper< MyKey, MyValue > {
protected CombineMyKeyMyValueReaderWrapper(
CombineFileSplit split, TaskAttemptContext ctx, Integer idx
) throws IOException, InterruptedException {
super(new MyFormat(), split, ctx, idx);
}
}
@Override
public RecordReader< MyKey, MyValue > createRecordReader(
InputSplit split, TaskAttemptContext ctx
) throws IOException {
return new CombineFileRecordReader< MyKey, MyValue >(
(CombineFileSplit)split, ctx, CombineMyKeyMyValueReaderWrapper.class
);
}
}
在你的工作的驅動程序,您現在應該能夠只是CombinedMyFormat
跌幅爲MyFormat
。您還應該設置一個max split size property以防止Hadoop將整個輸入合併爲一個分割。
你能詳細說明你的意思嗎?每個記錄有一個文件? –