2011-05-04 23 views
4

我想將整個文件用作MAP處理的單個記錄,文件名作爲關鍵字。
我讀過以下帖子:How to get Filename/File Contents as key/value input for MAP when running a Hadoop MapReduce Job?
雖然頂級答案的理論是堅實的,但沒有實際提供代碼或「如何做」。FileInputFormat其中文件名爲KEY且文本內容爲VALUE

這是我的自定義FileInputFormat和相應的RecordReader,它編譯,但不產生任何記錄數據。
感謝您的幫助。

public class CommentsInput 
    extends FileInputFormat<Text,Text> { 
protected boolean isSplitable(FileSystem fs, Path filename) 
{ 
    return false; 
} 
@Override 
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext ctx) 
     throws IOException, InterruptedException { 
    return new CommentFileRecordReader((FileSplit) split, ctx.getConfiguration()); 
} 

/////////////////////////

public class CommentFileRecordReader 
    extends RecordReader<Text,Text> { 
private InputStream in; 
private long start; 
private long length; 
private long position; 
private Text key; 
private Text value; 
private boolean processed; 
private FileSplit fileSplit; 
private Configuration conf; 

public CommentFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException 
{ 
    this.fileSplit = fileSplit; 
    this.conf=conf; 
} 

/** Boilerplate initialization code for file input streams. */ 
@Override 
public void initialize(InputSplit split, 
        TaskAttemptContext context) 
         throws IOException, InterruptedException { 
    Configuration conf = context.getConfiguration(); 

    fileSplit = (FileSplit) split; 
    this.start = fileSplit.getStart(); 
    this.length = fileSplit.getLength(); 
    this.position = 0; 
    this.processed = false; 

    Path path = fileSplit.getPath(); 
    FileSystem fs = path.getFileSystem(conf); 
    FSDataInputStream in = fs.open(path); 

    CompressionCodecFactory codecs = new CompressionCodecFactory(conf); 
    CompressionCodec codec = codecs.getCodec(path); 
    if (codec != null) 
     this.in = codec.createInputStream(in); 
    else 
     this.in = in; 

    // If using Writables: 
    // key = new Text(); 
    // value = new Text(); 
} 
public boolean next(Text key, Text value) throws IOException 
{ 
    if(!processed) 
    { 
     key = new Text(fileSplit.getPath().toString()); 
     Path file = fileSplit.getPath(); 
     FileSystem fs = file.getFileSystem(conf); 
     FSDataInputStream in = null; 
     byte[] contents = new byte[(int) fileSplit.getLength()]; 
     try 
     { 
      in = fs.open(file); 
      IOUtils.readFully(in, contents, 0, contents.length); 
      value.set(contents.toString()); 
     } 
     finally 
     { 
      IOUtils.closeStream(in); 
     } 
     processed = true; 
     return true; 
    } 
    return false; 
} 

@Override 
public boolean nextKeyValue() throws IOException { 
    // TODO parse the next key value, update position and return true. 
    return false; 
} 

@Override 
public Text getCurrentKey() { 
    return key; 
} 

@Override 
public Text getCurrentValue() { 
    return value; 
} 

/** Returns our progress within the split, as a float between 0 and 1. */ 
@Override 
public float getProgress() { 
    if (length == 0) 
     return 0.0f; 
    return Math.min(1.0f, position/(float)length); 
} 

@Override 
public void close() throws IOException { 
    if (in != null) 
     in.close(); 
} 
} 
+1

您是否能夠找到實現此目的的方法? – aa8y 2013-01-08 11:04:51

+0

您是否得到了正確的解決方案? – 2016-02-01 05:06:32

回答

1

你需要找到一種方法來定義自己的密鑰並確保你的課程使用它。您可以查看如何定義自己的密鑰類,並且可以通過調用其路徑中的方法然後使用它來創建密鑰來獲取文件名。

相關問題