2011-07-27 76 views
6

我有MapReduce工作: 我的代碼地圖類:的Hadoop MapReduce的:驅動程序的MapReduce工作中鏈接映射器

public static class MapClass extends Mapper<Text, Text, Text, LongWritable> { 

    @Override 
    public void map(Text key, Text value, Context context) 
     throws IOException, InterruptedException { 
    } 
} 

而且我想用ChainMapper:

1. Job job = new Job(conf, "Job with chained tasks"); 
2. job.setJarByClass(MapReduce.class); 
3. job.setInputFormatClass(TextInputFormat.class); 
4. job.setOutputFormatClass(TextOutputFormat.class); 

5. FileInputFormat.setInputPaths(job, new Path(InputFile)); 
6. FileOutputFormat.setOutputPath(job, new Path(OutputFile)); 

7. JobConf map1 = new JobConf(false); 

8. ChainMapper.addMapper(
     job, 
     MapClass.class, 
     Text.class, 
     Text.class, 
     Text.class, 
     Text.class, 
     true, 
     map1 
     ); 

但其報告中第8行的錯誤:

此行的多個標記 - addMappe r' - 類型ChainMapper中的方法addMapper(JobConf,Class>,Class,Class, Class,Class,boolean,JobConf)不適用於參數(Job, Class,Class,Class,Class,Class,布爾型,配置) - 調試當前指令指針 - 類型ChainMapper中的方法addMapper(JobConf,Class> Class,Class,Class, Class,Class,boolean,JobConf)不適用於參數 (JobConf,Class,類,類,類,類,布爾型,JobConf)

回答

0

您必須使用Configuration而不是JobConfJobConfConfiguration的子類,所以應該存在一個構造函數。

0

對於您的ChainMapper.addMapper()的第一個參數,您已通過job對象。函數期望類型爲JobConf的對象。重寫:

 
ChainMapper.addMapper(
      (JobConf)conf, 
      MapClass.class, 
      Text.class, 
      Text.class, 
      Text.class, 
      Text.class, 
      true, 
      map1 
      ); 

應該解決的問題..

+0

他已經有一個jobconf,他需要一個配置。在這裏鑄造不是正確的選擇。這是關於map1而不是conf。 –

+1

您的地圖類必須擴展:org.apache.hadoop.mapred.Mapper而不是org.apache.hadoop.mapreduce.Mapper – user864846

7

後很多「功夫」中,我能夠用ChainMapper/ChainReducer。感謝您的最新評論user864846。

/** 
* Licensed to the Apache Software Foundation (ASF) under one 
* or more contributor license agreements. See the NOTICE file 
* distributed with this work for additional information 
* regarding copyright ownership. The ASF licenses this file 
* to you under the Apache License, Version 2.0 (the 
* "License"); you may not use this file except in compliance 
* with the License. You may obtain a copy of the License at 
* 
*  http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, 
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and 
* limitations under the License. 
*/ 

package myPKG; 

/* 
* Ajitsen: Sample program for ChainMapper/ChainReducer. This program is modified version of WordCount example available in Hadoop-0.18.0. Added ChainMapper/ChainReducer and made to works in Hadoop 1.0.2. 
*/ 

import java.io.IOException; 
import java.util.Iterator; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.mapred.lib.ChainMapper; 
import org.apache.hadoop.mapred.lib.ChainReducer; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

public class ChainWordCount extends Configured implements Tool { 

    public static class Tokenizer extends MapReduceBase 
    implements Mapper<LongWritable, Text, Text, IntWritable> { 

     private final static IntWritable one = new IntWritable(1); 
     private Text word = new Text(); 

     public void map(LongWritable key, Text value, 
       OutputCollector<Text, IntWritable> output, 
       Reporter reporter) throws IOException { 
      String line = value.toString(); 
      System.out.println("Line:"+line); 
      StringTokenizer itr = new StringTokenizer(line); 
      while (itr.hasMoreTokens()) { 
       word.set(itr.nextToken()); 
       output.collect(word, one); 
      } 
     } 
    } 

    public static class UpperCaser extends MapReduceBase 
    implements Mapper<Text, IntWritable, Text, IntWritable> { 

     public void map(Text key, IntWritable value, 
       OutputCollector<Text, IntWritable> output, 
       Reporter reporter) throws IOException { 
      String word = key.toString().toUpperCase(); 
      System.out.println("Upper Case:"+word); 
      output.collect(new Text(word), value);  
     } 
    } 

    public static class Reduce extends MapReduceBase 
    implements Reducer<Text, IntWritable, Text, IntWritable> { 

     public void reduce(Text key, Iterator<IntWritable> values, 
       OutputCollector<Text, IntWritable> output, 
       Reporter reporter) throws IOException { 
      int sum = 0; 
      while (values.hasNext()) { 
       sum += values.next().get(); 
      } 
      System.out.println("Word:"+key.toString()+"\tCount:"+sum); 
      output.collect(key, new IntWritable(sum)); 
     } 
    } 

    static int printUsage() { 
     System.out.println("wordcount <input> <output>"); 
     ToolRunner.printGenericCommandUsage(System.out); 
     return -1; 
    } 

    public int run(String[] args) throws Exception { 
     JobConf conf = new JobConf(getConf(), ChainWordCount.class); 
     conf.setJobName("wordcount"); 

     if (args.length != 2) { 
      System.out.println("ERROR: Wrong number of parameters: " + 
        args.length + " instead of 2."); 
      return printUsage(); 
     } 
     FileInputFormat.setInputPaths(conf, args[0]); 
     FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

     conf.setInputFormat(TextInputFormat.class); 
     conf.setOutputFormat(TextOutputFormat.class); 

     JobConf mapAConf = new JobConf(false); 
     ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf); 

     JobConf mapBConf = new JobConf(false); 
     ChainMapper.addMapper(conf, UpperCaser.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf); 

     JobConf reduceConf = new JobConf(false); 
     ChainReducer.setReducer(conf, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf); 

     JobClient.runJob(conf); 
     return 0; 
    } 

    public static void main(String[] args) throws Exception { 
     int res = ToolRunner.run(new Configuration(), new ChainWordCount(), args); 
     System.exit(res); 
    } 
} 

編輯在最新的版本(至少從Hadoop的2.6),不需要在addMapper的true標誌。 (實際上簽名有改變抑制它)。

因此,這將只是

JobConf mapAConf = new JobConf(false); 
ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class, 
         Text.class, IntWritable.class, mapAConf); 
0

其實映射器類必須實現接口org.apache.hadoop.mapred.Mapper。我有同樣的問題,但這解決了它。