2011-12-22 13 views
12

我正在寫hadoop程序,而且我真的不想玩棄用的類。 具競爭力的超低我不能夠找到程序與更新Hadoop JobConf類已棄用,需要更新示例

org.apache.hadoop.conf.Configuration

類 insted的的

org.apache.hadoop.mapred .JobConf

class。

public static void main(String[] args) throws Exception { 
    JobConf conf = new JobConf(Test.class); 
    conf.setJobName("TESST"); 

    conf.setOutputKeyClass(Text.class); 
    conf.setOutputValueClass(IntWritable.class); 

    conf.setMapperClass(Map.class); 
    conf.setCombinerClass(Reduce.class); 
    conf.setReducerClass(Reduce.class); 

    conf.setInputFormat(TextInputFormat.class); 
    conf.setOutputFormat(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(conf, new Path(args[0])); 
    FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

    JobClient.runJob(conf); 
    } 

這是我的main()的樣子。 可以請任何人都會爲我提供更新的功能。

+0

的可能的複製[運行Hadoop的作業,而不使用JobConf](http://stackoverflow.com/questions/2115292/run-hadoop-job-without-using-jobconf) – chess007 2011-12-22 12:30:31

+0

不,其similar.But我想要配置類的例子,它是jobconf類的替代。 – CodeBanger 2011-12-22 13:08:34

回答

18

這裏是經典的WordCount示例。你會注意到其他進口的語氣可能不是必要的,閱讀代碼你會發現哪個是哪個。

有什麼不同?我使用工具界面和GenericOptionParser來解析作業命令a.k.a:hadoop jar ....

在映射器中,您會注意到一個運行的事情。您可以擺脫這種情況,當您爲Map方法提供代碼時,通常會默認調用它。我把它放在那裏給你的信息,你可以進一步控制映射階段。這都是使用新的API。希望對你有幫助。任何其他問題,讓我知道!

import java.io.IOException; 
import java.util.*; 

import org.apache.commons.io.FileUtils; 
import org.apache.hadoop.conf.*; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*; 

import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
import org.apache.hadoop.util.GenericOptionsParser; 

public class Inception extends Configured implements Tool{ 

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { 
    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
     String line = value.toString(); 
     StringTokenizer tokenizer = new StringTokenizer(line); 
     while (tokenizer.hasMoreTokens()) { 
      word.set(tokenizer.nextToken()); 
      context.write(word, one); 
     } 
    } 

    public void run (Context context) throws IOException, InterruptedException { 
     setup(context); 
     while (context.nextKeyValue()) { 
       map(context.getCurrentKey(), context.getCurrentValue(), context); 
      } 
     cleanup(context); 
    } 
} 

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { 

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
     throws IOException, InterruptedException { 
     int sum = 0; 
     for (IntWritable val : values) { 
      sum += val.get(); 
     } 
     context.write(key, new IntWritable(sum)); 
    } 
} 

public int run(String[] args) throws Exception { 

    Job job = Job.getInstance(new Configuration()); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 

    job.setMapperClass(Map.class); 
    job.setReducerClass(Reduce.class); 

    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    job.setJarByClass(WordCount.class); 

    job.submit(); 
    return 0; 
    } 

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    ToolRunner.run(new WordCount(), otherArgs); 
} 
} 
+0

thnx快速反應哥們。我會嘗試一下,然後接受你的答案。 – CodeBanger 2011-12-22 17:26:36

+0

我可能忘記了一些右括號。無論如何,任何構建錯誤的時候都應該直截了當地解決問題! – inquire 2011-12-22 17:33:04

+0

是的......將解決它並回復。 ;-) – CodeBanger 2011-12-22 17:33:52

1

還拿經典字計數爲例:

org.apache.hadoop.mapred.JobConf是舊的,在新版本中,我們使用ConfigurationJob實現。

請使用org.apache.hadoop.mapreduce.lib.*(這是新的API),而不是org.apache.hadoop.mapred.TextInputFormat(這是舊的)。

MapperReducer不是什麼新東西,請看main的功能,它包含了比較全面的配置,可以根據自己的具體要求隨意更改。

import java.io.IOException; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { 
    private Text outputKey; 
    private IntWritable outputVal; 

    @Override 
    public void setup(Context context) { 
    outputKey = new Text(); 
    outputVal = new IntWritable(1); 
    } 

    @Override 
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
    StringTokenizer stk = new StringTokenizer(value.toString()); 
    while(stk.hasMoreTokens()) { 
     outputKey.set(stk.nextToken()); 
     context.write(outputKey, outputVal); 
    } 
    } 
} 

class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 
    private IntWritable result; 

    @Override 
    public void setup(Context context) { 
    result = new IntWritable(); 
    } 

    @Override 
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 
    int sum = 0; 
    for(IntWritable val: values) { 
     sum += val.get(); 
    } 
    result.set(sum); 
    context.write(key, result); 
    } 
} 

public class WordCount { 
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
    Configuration conf = new Configuration(); 
    if(args.length != 2) { 
     System.err.println("Usage: <in> <out>"); 
     System.exit(2); 
    } 
    Job job = Job.getInstance(conf, "Word Count"); 

    // set jar 
    job.setJarByClass(WordCount.class); 

    // set Mapper, Combiner, Reducer 
    job.setMapperClass(TokenizerMapper.class); 
    job.setCombinerClass(IntSumReducer.class); 
    job.setReducerClass(IntSumReducer.class); 

    /* Optional, set customer defined Partioner: 
    * job.setPartitionerClass(MyPartioner.class); 
    */ 

    // set output key 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(IntWritable.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 

    // set input and output path 
    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    // by default, Hadoop use TextInputFormat and TextOutputFormat 
    // any customer defined input and output class must implement InputFormat/OutputFormat interface 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
}