2016-01-08 59 views
2

我試圖創建的兩個步驟組成的MapReduce的鏈條。 第一個降低發射鍵 - 值對作爲(鍵,值),其中值是自定義對象和第二映射器應該讀取的第一減速器的輸出的列表。 該列表是一個自定義的ArrayWritable。下面是相關代碼:的Hadoop MapReduce的鏈ArrayWritable

自定義對象:

public class Custom implements Writable { 
    private Text document; 
    private IntWritable count; 

    public Custom(){ 
     setDocument(""); 
     setCount(0); 
    } 

    public Custom(String document, int count) { 
     setDocument(document); 
     setCount(count); 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     // TODO Auto-generated method stub 
     document.readFields(in); 
     count.readFields(in); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     document.write(out); 
     count.write(out); 
    } 

    @Override 
    public String toString() { 
     return this.document.toString() + "\t" + this.count.toString(); 
    } 

    public int getCount() { 
     return count.get(); 
    } 

    public void setCount(int count) { 
     this.count = new IntWritable(count); 
    } 

    public String getDocument() { 
     return document.toString(); 
    } 

    public void setDocument(String document) { 
     this.document = new Text(document); 
    } 
} 

定製ArrayWritable:

class MyArrayWritable extends ArrayWritable { 
    public MyArrayWritable(Writable[] values) { 
     super(Custom.class, values); 
    } 

    public MyArrayWritable() { 
     super(Custom.class); 
    } 

    @Override 
    public Custom[] get() { 
     return (Custom[]) super.get(); 
    } 

    @Override 
    public String toString() { 
     return Arrays.toString(get()); 
    } 

    @Override 
    public void write(DataOutput arg0) throws IOException { 
     super.write(arg0); 
    } 
} 

首先減速:

public static class NGramReducer extends Reducer<Text, Text, Text, MyArrayWritable> { 
    public void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException { 
     //other code 
     context.write(key, mArrayWritable); 
    } 
} 

第二變換器:

public static class SecondMapper extends Mapper<Text, MyArrayWritable, Text, IntWritable> { 
    private StringBuilder docBuilder= new StringBuilder(); 

    public void map(Text key, MyArrayWritable value, Context context) throws IOException, InterruptedException { 
     //whatever code 
    } 
} 

而這些都是在主要設置:

//... 
    job1.setOutputKeyClass(Text.class); 
    job1.setOutputValueClass(MyArrayWritable.class); 
    job1.setInputFormatClass(WholeFileInputFormat.class); 
    FileInputFormat.addInputPath(job1, new Path(args[2])); 
    FileOutputFormat.setOutputPath(job1, TEMP_PATH); 
    //... 
    job2.setInputFormatClass(KeyValueTextInputFormat.class); 
    FileInputFormat.addInputPath(job2, TEMP_PATH); 
    FileOutputFormat.setOutputPath(job2, new Path(args[3])); 

當我運行它,我得到這個錯誤 錯誤:java.lang.ClassCastException:org.apache.hadoop.io.Text不能轉換的檢測器,$ MyArrayWritable

問題是什麼?我必須寫一個FileInputFormat嗎? (JOB1正常工作)

回答

0

看起來好像這是因爲你的工作2 InputFormat的。 KeyValueTextInputFormat.class期望一個鍵和值都是Text對象。由於您的工作1輸出(Text,MyArrayWritable),因此與價值有衝突。

幸運的是,你不必編寫自定義OutputFormat,以滿足您的數據!簡單地寫你的工作輸出1個數據成序列文件,這使在其二進制形式的數據:

//... 
job1.setOutputKeyClass(Text.class); 
job1.setOutputValueClass(MyArrayWritable.class); 
job1.setInputFormatClass(WholeFileInputFormat.class); 
job1.setOutputFormatClass(SequenceFileOutputFormat.class); 

FileInputFormat.addInputPath(job1, new Path(args[2])); 
SequenceFileOutputFormat.setOutputPath(job1, TEMP_PATH); 
//... 
job2.setInputFormatClass(SequenceFileInputFormat.class); 
SequenceFileInputFormat.addInputPath(job2, TEMP_PATH); 
FileOutputFormat.setOutputPath(job2, new Path(args[3])); 
+0

我修改的輸入和輸出,但現在我得到這個錯誤「錯誤:了java.lang.RuntimeException :java.lang.NoSuchMethodException:Detector $ MyArrayWritable。 ()' –

+0

http://stackoverflow.com/questions/11446635/no-such-method-exception-hadoop-init似乎回答那一個 –