2012-10-23 75 views
3

我需要使用map reduce實現功能。Hadoop Map Reduce,如何將第一個減速器輸出和第一個映射輸入結合起來,作爲第二個映射器的輸入?

要求在下面提到。

  1. 輸入相關的映射器是一個包含兩分的productId一個文件,Salescount
  2. 減速器輸出,salescount的總和

要求是我需要計算salescount /總和(salescount)。

爲此,我打算使用嵌套地圖縮小。 但是對於第二個映射器,我需要使用第一個reducer輸出和第一個映射的輸入。

我該如何執行此操作。或者有其他方法嗎?

問候 Vinu

回答

3

您可以使用ChainMapperChainReducer管道映射器和減縮你想要的方式。請看看here

下會出現類似的代碼段,你需要實現

JobConf mapBConf = new JobConf(false); 

JobConf reduceConf = new JobConf(false); 

ChainMapper.addMapper(conf, FirstMapper.class, FirstMapperInputKey.class, FirstMapperInputValue.class, 
    FirstMapperOutputKey.class, FirstMapperOutputValue.class, false, mapBConf); 

ChainReducer.setReducer(conf, FirstReducer.class, FirstMapperOutputKey.class, FirstMapperOutputValue.class, 
    FirstReducerOutputKey.class, FirstReducerOutputValue.class, true, reduceConf); 

ChainReducer.addMapper(conf, SecondMapper.class, FirstReducerOutputKey.class, FirstReducerOutputValue.class, 
    SecondMapperOutputKey.class, SecondMapperOutputValue.class, false, null); 

ChainReducer.setReducer(conf, SecondReducer.class, SecondMapperOutputKey.class, SecondMapperOutputValue.class, SecondReducerOutputKey.class, SecondReducerOutputValue.class, true, reduceConf); 

,或者如果你不想使用多個映射器和減壓器,你可以做以下

public static class ProductIndexerMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> { 

    private static Text productId = new Text(); 
    private static LongWritable salesCount = new LongWritable(); 

    @Override 
    public void map(LongWritable key, Text value, 
      OutputCollector<Text, LongWritable> output, Reporter reporter) 
      throws IOException { 
     String[] values = value.toString().split("\t"); 
     productId.set(values[0]);   
     salesCount.set(Long.parseLong(values[1])); 
     output.collect(productId, salesCount); 
    } 

} 

public static class ProductIndexerReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> { 

    private static LongWritable productWritable = new LongWritable(); 

    @Override 
    public void reduce(Text key, Iterator<LongWritable> values, 
      OutputCollector<Text, LongWritable> output, Reporter reporter) 
      throws IOException { 
     List<LongWritable> items = new ArrayList<LongWritable>(); 
     long total = 0; 
     LongWritable item = null; 
     while(values.hasNext()) { 
      item = values.next(); 
      total += item.get(); 
      items.add(item); 
     } 
     Iterator<LongWritable> newValues = items.iterator(); 
     while(newValues.hasNext()) { 
      productWritable.set(newValues.next().get()/total); 
      output.collect(key, productWritable); 
     } 
    } 

} 

`

+0

由於@shazin提到的作業鏈不是這種情況下需要的。對於許多這樣的算法,請檢查[使用MapReduce的數據密集型文本處理 ](http://lintool.github.com/MapReduceAlgorithms/index.html)。這將有助於以MapReduce的方式進行思考。 –

+0

Vinu,你能寫一個例子嗎? 假設有輸入文件作爲 1 ---> 23
1 ---> 33
1 ---> 22
1 ---> 2
2 --- > 3
2 ---> 4
2 ---> 5
你期望的輸出? (23 + 33 + 22 + 2)/(23 + 33 + 22 + 2 + 3 + 4 + 5)
2 --->(3 + 4 + 5)/ + 33 + 22 + 2 + 3 + 4 + 5)
這是你的問題? –

0

在手的用例,我相信我們不需要兩個不同的映射器/ MapReduce工作來實現這一目標。 (作爲上述評論中給出的答案的擴展)

讓我們假設您有一個非常大的輸入文件,將其分割成HDFS中的多個塊。當您用此文件作爲輸入觸發Ma​​pReduce作業時,多個映射器(等於輸入塊的數量)將並行開始執行。

在您的映射器實現中,從輸入讀取每行,並將productId作爲關鍵字並將saleCount作爲值寫入上下文。這些數據被傳遞給Reducer。

我們知道,在一個MR作業中,所有具有相同密鑰的數據都被傳遞給同一個reducer。現在,在您的Reducer實施中,您可以計算特定productId的所有saleCounts的總和。

注意:我不確定分子中的'salescount'值。

假設它是特定產品發生次數的計數,請使用計數器在計算SUM(saleCount)的同一循環中添加並獲取總銷售計數。所以,我們有

totalCount - >產品發生次數的計數 sumSaleCount - >每個產品的銷售額的總和。

現在,您可以直接劃分上述值:totalCount/sumSaleCount。

希望這會有所幫助!請讓我知道,如果你有一個不同的使用案例。

相關問題