2010-05-16 82 views
1

有人可以通過使用從DDL生成的類讀取和寫入數據的基本工作流程來引導我嗎?Hadoop/MapReduce:讀取和寫入從DDL生成的類

我用DDL定義了一些類似結構的記錄。例如:

class Customer { 
    ustring FirstName; 
    ustring LastName; 
    ustring CardNo; 
    long LastPurchase; 
    } 

我編譯了這個以獲得一個Customer類並將其包含到我的項目中。我可以很容易地看到如何將它用作映射器和縮減器(生成的類實現Writable)的輸入和輸出,但不知道如何將它讀取和寫入文件。

關於org.apache.hadoop.record軟件包的JavaDoc討論瞭如何以二進制,CSV或XML格式對這些記錄進行序列化。我該如何做到這一點?假設我的reducer生成IntWritable鍵和Customer值。我使用什麼OutputFormat以CSV格式編寫結果?如果我想對它們執行分析,我將在以後使用什麼InputFormat來讀取結果文件?

回答

1

好的,所以我覺得我有這個想法。我不確定它是否是最直接的方式,所以如果您知道更簡單的工作流程,請糾正我。

從DDL生成的每個類實現記錄接口,並因此提供了兩種方法:

序列化(RecordOutput出)用於寫入 反序列化(RecordInput中)用於讀取

RecordOutputRecordInput是提供在org.apache.hadoop.record包中的實用接口。有幾個實現(例如XMLRecordOutputBinaryRecordOutputCSVRecordOutput

據我所知,你必須實現自己的OUTPUTFORMATInputFormat類使用這些。這很容易做到。

例如,我在原來的問題談到了(一個在CSV格式輸出整數鍵和客戶價值)的OUTPUTFORMAT將實施這樣的:


    private static class CustomerOutputFormat 
    extends TextOutputFormat<IntWritable, Customer> 
    { 

    public RecordWriter<IntWritable, Customer> getRecordWriter(FileSystem ignored, 
     JobConf job, 
     String name, 
     Progressable progress) 
    throws IOException { 
     Path file = FileOutputFormat.getTaskOutputPath(job, name); 
     FileSystem fs = file.getFileSystem(job); 
     FSDataOutputStream fileOut = fs.create(file, progress); 
     return new CustomerRecordWriter(fileOut); 
    } 

    protected static class CustomerRecordWriter 
     implements RecordWriter<IntWritable, Customer> 
    { 

     protected DataOutputStream outStream ; 

     public AnchorRecordWriter(DataOutputStream out) { 
     this.outStream = out ; 
     } 

     public synchronized void write(IntWritable key, Customer value) throws IOException { 

     CsvRecordOutput csvOutput = new CsvRecordOutput(outStream); 
     csvOutput.writeInteger(key.get(), "id") ; 
     value.serialize(csvOutput) ; 
     } 

     public synchronized void close(Reporter reporter) throws IOException { 
     outStream.close(); 
     } 
    } 
    } 

創建InputFormat是一樣的。由於csv格式是每行一個條目,因此我們可以在內部使用LineRecordReader來完成大部分工作。



private static class CustomerInputFormat extends FileInputFormat<IntWritable, Customer> { 

    public RecordReader<IntWritable, Customer> getRecordReader(
    InputSplit genericSplit, 
    JobConf job, 
    Reporter reporter) 
    throws IOException { 

    reporter.setStatus(genericSplit.toString()); 
    return new CustomerRecordReader(job, (FileSplit) genericSplit); 
    } 

    private class CustomerRecordReader implements RecordReader<IntWritable, Customer> { 

    private LineRecordReader lrr ; 

    public CustomerRecordReader(Configuration job, FileSplit split) 
    throws IOException{ 
     this.lrr = new LineRecordReader(job, split);  
    } 

    public IntWritable createKey() { 
     return new IntWritable(); 
    } 

    public Customer createValue() { 
     return new Customer(); 
    } 

    public synchronized boolean next(IntWritable key, Customer value) 
    throws IOException { 

     LongWritable offset = new LongWritable() ; 
     Text line = new Text() ; 

     if (!lrr.next(offset, line)) 
     return false ; 

     CsvRecordInput cri = new CsvRecordInput(new  
     ByteArrayInputStream(line.toString().getBytes())) ; 
     key.set(cri.readInt("id")) ; 
     value.deserialize(cri) ; 

     return true ; 
    } 

    public float getProgress() { 
     return lrr.getProgress() ; 
    } 

    public synchronized long getPos() throws IOException { 
     return lrr.getPos() ; 
    } 

    public synchronized void close() throws IOException { 
     lrr.close(); 
    } 
    } 
}