2013-11-26 55 views
7

我有一個簡單的mapreduce代碼與映射器,減速器和組合器。 映射器的輸出傳遞給組合器。但是對於reducer而言,不是來自combiner的輸出,而是來自mapper的輸出。Mapreduce組合器

請幫助

代碼:

package Combiner; 
import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.Mapper.Context; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 

public class AverageSalary 
{ 
public static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable> 
{ 
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    {  
     String[] empDetails= value.toString().split(","); 
     Text unit_key = new Text(empDetails[1]);  
     DoubleWritable salary_value = new DoubleWritable(Double.parseDouble(empDetails[2])); 
     context.write(unit_key,salary_value);  

    } 
} 
public static class Combiner extends Reducer<Text,DoubleWritable, Text,Text> 
{ 
    public void reduce(final Text key, final Iterable<DoubleWritable> values, final Context context) 
    { 
     String val; 
     double sum=0; 
     int len=0; 
     while (values.iterator().hasNext()) 
     { 
      sum+=values.iterator().next().get(); 
      len++; 
     } 
     val=String.valueOf(sum)+":"+String.valueOf(len); 
     try { 
      context.write(key,new Text(val)); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 
public static class Reduce extends Reducer<Text,Text, Text,Text> 
{ 
    public void reduce (final Text key, final Text values, final Context context) 
    { 
     //String[] sumDetails=values.toString().split(":"); 
     //double average; 
     //average=Double.parseDouble(sumDetails[0]); 
     try { 
      context.write(key,values); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 
public static void main(String args[]) 
{ 
    Configuration conf = new Configuration(); 
    try 
    { 
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
    if (otherArgs.length != 2) {  
     System.err.println("Usage: Main <in> <out>");  
     System.exit(-1); }  
    Job job = new Job(conf, "Average salary");  
    //job.setInputFormatClass(KeyValueTextInputFormat.class);  
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
    job.setJarByClass(AverageSalary.class);  
    job.setMapperClass(Map.class);  
    job.setCombinerClass(Combiner.class); 
    job.setReducerClass(Reduce.class);  
    job.setOutputKeyClass(Text.class);  
    job.setOutputValueClass(Text.class);  

     System.exit(job.waitForCompletion(true) ? 0 : -1); 
    } catch (ClassNotFoundException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } catch (IOException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } catch (InterruptedException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
} 

}

+1

真的很難猜到沒有代碼的情況下會發生什麼。 – user987339

+0

代碼已被添加 – user2401464

回答

8

看來你忘了組合的大約重要的特性:

的輸入類型的鍵/值並且鍵/值的輸出類型需要相同。

您不能接受Text/DoubleWritable並返回Text/Text。我建議你使用Text而不是DoubleWritable,並在Combiner內部做適當的解析。

14

組合器的#1規則是:不認爲組合器將運行。僅將組合器僅作爲優化

合併器不保證能夠運行所有數據。在某些情況下,當數據不需要傳輸到磁盤時,MapReduce將完全跳過使用組合器。還要注意組合器可能會在數據的子集上運行多次!它會在每次泄漏時運行一次。

就你而言,你正在做出這個糟糕的假設。您應該在Combiner和Reducer中完成總和。

此外,你也應該遵循@ user987339的答案。組合器的輸入和輸出需要相同(文本,雙精度 - >文本,雙精度),它需要匹配Mapper的輸出和Reducer的輸入。

+0

我覺得,這應該被標記爲接受答案。沒有冒犯,很好的答案。 – Azim

0

Combiner在運行mapreduce時不會總是有效。

如果至少有三個溢出文件(寫入本地磁盤的映射器的輸出),組合器將執行,以便可以減小文件的大小,以便它可以輕鬆傳輸到節點。

針對其組合器需要運行可通過min.num.spills.for.combine屬性

1

設置如果一個組合功能用於泄漏的數量,則它是一樣的形式減少功能(並且是 減速器的一種實施),除了它的輸出類型是中間鍵和值類型(K2和V2),所以它們可以輸入reduce函數: map:(K1,V1)→list(K2,V2) combine:(K2,list (V2))→list(K2,V2) reduce:(K2,list(V2))→list(K3,V3) 組合函數和歸約函數常常相同,在這種情況下,K3與 K2和V3與V2相同。

相關問題