2015-09-05 40 views
-4

是否可以自定義地圖縮小框架實現。如果是,是否有任何指導,如接口實施和所有如何實現自定義地圖縮小框架

+0

你爲什麼要這麼做? –

+0

請更具體地瞭解您所尋求的自定義類型。 – Shatu

+0

Apache點燃有地圖縮小框架,所以想知道是否有標準機制 – Avinash

回答

0

你需要更具體。

Map Reduce框架允許您在地圖縮小過程中的幾乎所有步驟中提供您自己的類。像輸入和輸出類一樣,如何分割工作等等。默認情況下,hadoop包含幾個在幾乎所有情況下都足夠的實現,但如果不是,您可以提供自己的實現。

如果您希望對流程有更多的控制並避免MapReduce規範施加的限制,那麼您可以使用Yarn,但在這種情況下,您需要實現所有任務之間的協調/通信。 MapReduceV2在頂部紗線上運行。

如果你想在中間的東西,你可以使用框架,如http://twill.incubator.apache.org/

0

所以一般情況下,你將需要在地圖,減少工作的「定製化」的實施mapper和reducer類。開始瞭解這一點的最佳位置是https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

上述鏈接實現了一個wordcount示例,該示例基本上使用hadoop使用map-reduce框架計算跨文件的字數。

基本的「語法」的地圖,減少工作是:

public static class IntSumReducer 
    extends Reducer<Text,IntWritable,Text,IntWritable> { 
private IntWritable result = new IntWritable(); 

public void reduce(Text key, Iterable<IntWritable> values, 
        Context context 
        ) throws IOException, InterruptedException 

public static class IntSumReducer 
    extends Reducer<Text,IntWritable,enter code hereText,IntWritable> { 
private IntWritable result = new IntWritable(); 

public void reduce(Text key, Iterable<IntWritable> values, 
        Context context 
        ) throws IOException, InterruptedException 
{ 
    //Write reducer code here 
} 
} 

正如你可以在上面看到,地圖邏輯是在地圖的方法和減速器邏輯進入減少方法。你所有的定製都在這裏。當然,您還可以使用組合器方法來計算縮小階段,但是在本地級別。在Hadoop中

0

的hello world(簡單的地圖減少代碼的理解是如何工作的地圖降低範式)more detailed

import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.IOException; 
import java.net.URI; 
import java.util.ArrayList; 
import java.util.HashSet; 
import java.util.List; 
import java.util.Set; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.Counter; 
import org.apache.hadoop.util.GenericOptionsParser; 
import org.apache.hadoop.util.StringUtils; 

public class WordCount2 { 

    public static class TokenizerMapper 
     extends Mapper<Object, Text, Text, IntWritable>{ 

    static enum CountersEnum { INPUT_WORDS } 

    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 

    private boolean caseSensitive; 
    private Set<String> patternsToSkip = new HashSet<String>(); 

    private Configuration conf; 
    private BufferedReader fis; 

    @Override 
    public void setup(Context context) throws IOException, 
     InterruptedException { 
     conf = context.getConfiguration(); 
     caseSensitive = conf.getBoolean("wordcount.case.sensitive", true); 
     if (conf.getBoolean("wordcount.skip.patterns", true)) { 
     URI[] patternsURIs = Job.getInstance(conf).getCacheFiles(); 
     for (URI patternsURI : patternsURIs) { 
      Path patternsPath = new Path(patternsURI.getPath()); 
      String patternsFileName = patternsPath.getName().toString(); 
      parseSkipFile(patternsFileName); 
     } 
     } 
    } 

    private void parseSkipFile(String fileName) { 
     try { 
     fis = new BufferedReader(new FileReader(fileName)); 
     String pattern = null; 
     while ((pattern = fis.readLine()) != null) { 
      patternsToSkip.add(pattern); 
     } 
     } catch (IOException ioe) { 
     System.err.println("Caught exception while parsing the cached file '" 
      + StringUtils.stringifyException(ioe)); 
     } 
    } 

    @Override 
    public void map(Object key, Text value, Context context 
        ) throws IOException, InterruptedException { 
     String line = (caseSensitive) ? 
      value.toString() : value.toString().toLowerCase(); 
     for (String pattern : patternsToSkip) { 
     line = line.replaceAll(pattern, ""); 
     } 
     StringTokenizer itr = new StringTokenizer(line); 
     while (itr.hasMoreTokens()) { 
     word.set(itr.nextToken()); 
     context.write(word, one); 
     Counter counter = context.getCounter(CountersEnum.class.getName(), 
      CountersEnum.INPUT_WORDS.toString()); 
     counter.increment(1); 
     } 
    } 
    } 

    public static class IntSumReducer 
     extends Reducer<Text,IntWritable,Text,IntWritable> { 
    private IntWritable result = new IntWritable(); 

    public void reduce(Text key, Iterable<IntWritable> values, 
         Context context 
         ) throws IOException, InterruptedException { 
     int sum = 0; 
     for (IntWritable val : values) { 
     sum += val.get(); 
     } 
     result.set(sum); 
     context.write(key, result); 
    } 
    } 

    public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); 
    String[] remainingArgs = optionParser.getRemainingArgs(); 
    if (!(remainingArgs.length != 2 | | remainingArgs.length != 4)) { 
     System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]"); 
     System.exit(2); 
    } 
    Job job = Job.getInstance(conf, "word count"); 
    job.setJarByClass(WordCount2.class); 
    job.setMapperClass(TokenizerMapper.class); 
    job.setCombinerClass(IntSumReducer.class); 
    job.setReducerClass(IntSumReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 

    List<String> otherArgs = new ArrayList<String>(); 
    for (int i=0; i < remainingArgs.length; ++i) { 
     if ("-skip".equals(remainingArgs[i])) { 
     job.addCacheFile(new Path(remainingArgs[++i]).toUri()); 
     job.getConfiguration().setBoolean("wordcount.skip.patterns", true); 
     } else { 
     otherArgs.add(remainingArgs[i]); 
     } 
    } 
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0))); 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1))); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 
相關問題