2016-04-24 107 views
0

我已經定義了2個映射器來分別處理2個文件。使用MultipleInputs類時映射器的類型不匹配

第一mapper- 輸出第二mapper-

的在驅動程序代碼的輸出繼電器我加入多輸入類的使用addInputPath()都映射器。

運行罐子時,我得到類型不匹配的錯誤。

16/04/24 18:40:28 INFO mapreduce.Job: Task Id : attempt_1461435780053_0008_m_000001_0, Status : FAILED 
Error: java.io.IOException: Type mismatch in value from map: expected hadoop.StationObj, received org.apache.hadoop.io.Text 
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1077) 

下面是代碼

public static class customerMapper extends Mapper<LongWritable,Text,IntWritable,StationObj> 
    { 
     IntWritable outkey=new IntWritable(); 
     StationObj outvalue=new StationObj(); 

     //2,Russia,Jhonson,10000 

     public void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException 
     { 
      String []cols=values.toString().split(","); 
      outkey.set(Integer.parseInt(cols[0])); 
      outvalue.setAmount(Integer.parseInt(cols[3])); 
      outvalue.setCountry(cols[1]); 
      outvalue.setProduct(cols[2]); 

      context.write(outkey, outvalue); 
     } 
    } 

    public static class countryMapper extends Mapper<LongWritable,Text,IntWritable,Text> 
    { 
     IntWritable outkey=new IntWritable(); 
     Text outvalue=new Text(); 
     public void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException 
     { 
      String []cols=values.toString().split(","); 

      outkey.set(Integer.parseInt(cols[0])); 
      outvalue.set(cols[1]); 
      context.write(outkey,outvalue); 
     } 
    } 

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
     Configuration conf=new Configuration(); 
     Job job=new Job(conf,"dsddd"); 
     job.setJarByClass(stationRedJoin.class); 
     job.setMapOutputKeyClass(IntWritable.class); 

     //job.setMaxMapAttempts(1); 

     MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, customerMapper.class); 
     MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, countryMapper.class); 

     FileOutputFormat.setOutputPath(job, new Path(args[2])); 

     System.exit(job.waitForCompletion(true)?1:0); 

    } 

} 

回答

0

這是更好地通過所有的類型兩個映射器和減壓器(如果有的話)驅動類,如:

//output format for mapper 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); <--- 

//output format for reducer (if) 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

//use MultipleInputs and specify different Record class and Input formats 
     MultipleInputs.addInputPath(job, fPath, FirstInputFormat.class, MyFirstMap.class); 
     MultipleInputs.addInputPath(job, sPath, SecondInputFormat.class, MySecondMap.class); 
+0

我有2個映射器。以下是兩者的輸入和輸出類型。 第一映射器-customerMapper延伸映射器 第二映射器-countryMapper延伸映射器 輸出類型都映射器是不同的。 –

相關問題