2011-03-27 72 views
12

我需要從我的減速器中的映射器訪問計數器。這可能嗎?如果是這樣怎麼辦?從減速器訪問映射器的計數器

舉個例子:我 映射是:

public class CounterMapper extends Mapper<Text,Text,Text,Text> { 

    static enum TestCounters { TEST } 

    @Override 
    protected void map(Text key, Text value, Context context) 
        throws IOException, InterruptedException { 
     context.getCounter(TestCounters.TEST).increment(1); 
     context.write(key, value); 
    } 
} 

我的減速器是

public class CounterReducer extends Reducer<Text,Text,Text,LongWritable> { 

    @Override 
    protected void reduce(Text key, Iterable<Text> values, Context context) 
         throws IOException, InterruptedException { 
     Counter counter = context.getCounter(CounterMapper.TestCounters.TEST); 
     long counterValue = counter.getValue(); 
     context.write(key, new LongWritable(counterValue)); 
    } 
} 

的對價始終是0 我是不是做錯了什麼或者是這是不可能的?

回答

2

map/reduce的重點在於並行化作業。將會有許多獨特的映射器/縮減器,因此除了映射/縮減對的運行之外,該值不會是正確的。

他們有字數例如:

http://wiki.apache.org/hadoop/WordCount

你可以改變context.write(字,一個)context.write(線,一個)

1

全球計數器值從不向每個映射器或減速器回放。如果您希望減速器可以使用映射器記錄的數量,則需要依靠某種外部機制來執行此操作。

+0

JobTracker將跟蹤計數器。 – 2011-03-28 04:26:07

9

在Reducer的配置(JobConf)中,可以使用JobConf對象查找Reducer自己的作業ID。因此,您的Reducer可以創建自己的JobClient - 即與作業跟蹤器的連接 - 並查詢計數器以查找此作業(或針對此問題的任何作業)。

// in the Reducer class... 
private long mapperCounter; 

@Override 
public void configure(JobConf conf) { 
    JobClient client = new JobClient(conf); 
    RunningJob parentJob = 
     client.getJob(JobID.forName(conf.get("mapred.job.id"))); 
    mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME); 
} 

現在你可以在reduce()方法本身內使用mapperCounter了。

你實際上需要一個嘗試在這裏。我使用舊的API,但不應該很難適應新的API。

注意,映射器櫃檯都應該任何減速開始前完成,因此違背了賈斯汀·托馬斯的評論,我相信你應該得到精確的值(只要減速不增加專櫃的一樣!)

+0

看起來與直覺相反,映射器中的計數器在reducer中不可用,但在'Hadoop'中,reducer可以比所有映射器完成更早開始執行。在這種情況下,計數器的值可能在不同的時間被讀取爲不同的。要詳細瞭解如何可以在時間映射程序完成執行之前啓動reducer,請訪問以下文章:http://stackoverflow.com/questions/11672676/when-do-reduce-tasks-start-in-hadoop – abhinavkulkarni 2013-10-10 19:09:34

+2

@abhinavkulkarni實際上,** only **還原器的混洗階段可以在所有映射器啓動之前啓動,這與計數器無關。所以,當減速機的減速階段開始時,所有的映射計數器都是正確的。在同一篇文章中:「另一方面,分類和縮減只能在所有的mapper完成後才能啓動。」 – vefthym 2014-05-12 14:22:47

8

上實現新的API傑夫G公司的解決方案:

@Override 
    public void setup(Context context) throws IOException, InterruptedException{ 
     Configuration conf = context.getConfiguration(); 
     Cluster cluster = new Cluster(conf); 
     Job currentJob = cluster.getJob(context.getJobID()); 
     mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue(); 
    } 
+2

我試過這個,但是我在下面一行mapperCounter = currentJob.getCounters()。findCounter(COUNTER_NAME),我用自定義計數器替換了COUNTER_NAME – 2015-12-09 05:32:51

+0

似乎'cluster.getJob( context.getJobID());'在hadoop的獨立操作中不起作用。 在單節點羣集模式下運行時,這對我很有用。 – dauer 2016-11-23 17:08:11

1

我問this question,但我還沒有解決我的問題。然而,我想到了另一種解決方案。在映射器中,字數被計數,並且它可以在運行映射器結束的清除函數中用最小密鑰寫入中間輸出(以便這個值在頭)。在減速器中,單詞的數量是通過在頭部添加值來計算的。示例代碼及其輸出的一部分在下面提供。

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

import java.io.IOException; 
import java.util.StringTokenizer; 

/** 
* Created by tolga on 1/26/16. 
*/ 
public class WordCount { 
    static enum TestCounters { TEST } 
    public static class Map extends Mapper<Object, Text, Text, LongWritable> { 
     private final static LongWritable one = new LongWritable(1); 
     private Text word = new Text(); 

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
      String line = value.toString(); 
      StringTokenizer tokenizer = new StringTokenizer(line); 
      while (tokenizer.hasMoreTokens()) { 
       word.set(tokenizer.nextToken()); 
       context.write(word, one); 
       context.getCounter(TestCounters.TEST).increment(1); 
      } 
     } 

     @Override 
     protected void cleanup(Context context) throws IOException, InterruptedException { 
      context.write(new Text("!"),new LongWritable(context.getCounter(TestCounters.TEST).getValue())); 
     } 
    } 

    public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> { 

     public void reduce(Text key, Iterable<LongWritable> values, Context context) 
       throws IOException, InterruptedException { 
      int sum = 0; 
      for (LongWritable val : values) { 
       sum += val.get(); 
      } 
      context.write(key, new LongWritable(sum)); 
     } 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 

     Job job = new Job(conf, "WordCount"); 
     job.setJarByClass(WordCount.class); 

     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(LongWritable.class); 

     job.setMapperClass(Map.class); 
     job.setReducerClass(Reduce.class); 

     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     job.waitForCompletion(true); 
    } 
} 

文本文件:

Turgut Özal University is a private university located in Ankara, Turkey. It was established in 2008 by the Turgut Özal Thought and Action Foundation and is named after former Turkish president Turgut Özal. 

中間輸出

**! \t 33** 
 
2008 \t 1 
 
Action \t 1 
 
Ankara, \t 1 
 
Foundation \t 1 
 
It \t 1 
 
Thought \t 1 
 
Turgut \t 1 
 
Turgut \t 1 
 
Turgut \t 1

**! \t 33** 
 
2008 \t 1 
 
Action \t 1 
 
Ankara, \t 1 
 
Foundation \t 1 
 
It \t 1 
 
Thought \t 1 
 
Turgut \t 3

0

從伊扎基的回答

findCounter(COUNTER_NAME)改進是不再支持 - https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/Counters.html

@Override 
public void setup(Context context) throws IOException, InterruptedException{ 
    Configuration conf = context.getConfiguration(); 
    Cluster cluster = new Cluster(conf); 
    Job currentJob = cluster.getJob(context.getJobID()); 
    mapperCounter = currentJob.getCounters().findCounter(GROUP_NAME, COUNTER_NAME).getValue(); 
} 

GROUP_NAME指定,調用計數器時。例如

context.getCounter("com.example.mycode", "MY_COUNTER").increment(1); 

然後

mapperCounter = currentJob.getCounters().findCounter("com.example.mycode", "MY_COUNTER").getValue(); 

同樣,如果計數器不存在,一個重要的點將初始化一個具有值爲0

相關問題