2012-06-14 22 views
4

我遇到了一個非常奇怪的問題。減速器可以工作,但如果我檢查輸出文件,我只能找到映射器的輸出。 當我試圖調試,我發現字計數樣品同樣的問題後,我從Longwritable改變映射器輸出值類型設置爲文本Hadoop:Reducer將Mapper輸出寫入輸出文件

package org.myorg; 

import java.io.IOException; 
import java.util.*; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.*; 
import org.apache.hadoop.mapreduce.lib.input.*; 
import org.apache.hadoop.mapreduce.lib.output.*; 
import org.apache.hadoop.util.*; 

public class WordCount extends Configured implements Tool { 

    public static class Map 
     extends Mapper<LongWritable, Text, Text, Text> { 
    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 

    public void map(LongWritable key, Text wtf, Context context) 
     throws IOException, InterruptedException { 
     String line = wtf.toString(); 
     StringTokenizer tokenizer = new StringTokenizer(line); 
     while (tokenizer.hasMoreTokens()) { 
     word.set(tokenizer.nextToken()); 
     context.write(word, new Text("frommapper")); 
     } 
    } 
    } 

    public static class Reduce 
     extends Reducer<Text, Text, Text, Text> { 
    public void reduce(Text key, Text wtfs, 
     Context context) throws IOException, InterruptedException { 
/* 
     int sum = 0; 
     for (IntWritable val : wtfs) { 
     sum += val.get(); 
     } 
     context.write(key, new IntWritable(sum));*/ 
    context.write(key,new Text("can't output")); 
    } 
    } 

    public int run(String [] args) throws Exception { 
    Job job = new Job(getConf()); 
    job.setJarByClass(WordCount.class); 
    job.setJobName("wordcount"); 


    job.setOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Text.class); 
     job.setOutputValueClass(Text.class); 
    job.setMapperClass(Map.class); 
    //job.setCombinerClass(Reduce.class); 
    job.setReducerClass(Reduce.class); 

    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    boolean success = job.waitForCompletion(true); 
    return success ? 0 : 1; 
     } 

    public static void main(String[] args) throws Exception { 
    int ret = ToolRunner.run(new WordCount(), args); 
    System.exit(ret); 
    } 
} 

這裏的結果

JobClient:  Combine output records=0 
12/06/13 17:37:46 INFO mapred.JobClient:  Map input records=7 
12/06/13 17:37:46 INFO mapred.JobClient:  Reduce shuffle bytes=116 
12/06/13 17:37:46 INFO mapred.JobClient:  Reduce output records=7 
12/06/13 17:37:46 INFO mapred.JobClient:  Spilled Records=14 
12/06/13 17:37:46 INFO mapred.JobClient:  Map output bytes=96 
12/06/13 17:37:46 INFO mapred.JobClient:  Combine input records=0 
12/06/13 17:37:46 INFO mapred.JobClient:  Map output records=7 
12/06/13 17:37:46 INFO mapred.JobClient:  Reduce input records=7 

然後我在outfile中發現了奇怪的結果。無論是否更改減少輸出值的類型,我都將映射的輸出值類型和減速器的輸入鍵類型更改爲文本後發生此問題。我也被迫改變job.setOutputValue(Text.class)

a frommapper 
a frommapper 
a frommapper 
gg frommapper 
h frommapper 
sss frommapper 
sss frommapper 

幫助!

+1

如果reduce函數的簽名應該是這樣的:reduce(KEYIN key,Iterable values,Reducer.Context context)?那裏沒有可迭代的部分。 –

回答

4

你減少函數的參數應該如下:

public void reduce(Text key, Iterable <Text> wtfs, 
    Context context) throws IOException, InterruptedException { 

隨着你定義的參數的方式,減少操作是沒有得到值的列表,因此它只是輸出輸入什麼它得到地圖功能,因爲

sum+ = val.get() 

只是從0到1時都需要,因爲每個<key, value>對形式<word, one>分開來減速。

此外,映射函數通常不會寫入輸出文件(我從來沒有聽說過它,但我不知道這是否可能)。在通常情況下,它總是寫入輸出文件的減速器。映射器輸出是Hadoop透明處理的中間數據。所以如果你在輸出文件中看到某些東西,那必須是reducer輸出,而不是mapper輸出。如果你想驗證這一點,你可以去日誌中找出你所運行的工作,然後逐個檢查每個映射器和reducer中發生了什麼。

希望這能爲你解決一些問題。

+1

我認爲這是正確的,我沒有看到任何情況下,當在程序中明確定義reduce函數時,地圖輸出直接寫入輸出文件 – 2012-06-14 04:44:14

+1

非常感謝!這正是問題所在。順便說一下,當作業配置中使用job.setNumReduceTasks(0)時,映射器只能寫入文件。那時候,輸出文件名包含'-m'而不是'-r' –

+0

啊,謝謝你告訴我,我不知道那個:) – Chaos