2014-03-25 41 views
0

Hadoop和HBase的新功能。讓我用一個例子來解釋我的問題。爲簡潔起見,數據變得很小。從Reducer中的HBase中讀取數據

假設我們有一個名爲item.log的文件,它包含以下信息。

ITEM-1,PRODUCT-1 
ITEM-2,PRODUCT-1 
ITEM-3,PRODUCT-2 
ITEM-4,PRODUCT-2 
ITEM-5,PRODUCT-3 
ITEM-6,PRODUCT-1 
ITEM-7,PRODUCT-1 
ITEM-8,PRODUCT-2 
ITEM-9,PRODUCT-1 

我有如下的地圖減少代碼,

package org.sanjus.hadoop; 

import java.io.IOException; 
import java.util.Iterator; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.JobClient; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Mapper; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reducer; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.TextInputFormat; 
import org.apache.hadoop.mapred.TextOutputFormat; 

public class ProductMapReduce { 

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> { 

     public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { 
      String[] columns = value.toString().split(","); 

      if (columns.length != 2) { 
       System.out.println("Bad line/value " + value); 
       return; 
      } 

      Text word = new Text(columns[1]); 
      LongWritable counter = new LongWritable(1L); 

      output.collect(word, counter); 
     } 
    } 


    public static class Reduce extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> { 

     public void reduce(Text key, Iterator<LongWritable> iterator, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { 
      long sum = 0L; 

      while (iterator.hasNext()) { 
       sum += iterator.next().get(); 
      } 
      output.collect(key, new LongWritable(sum)); 
     } 

    } 

    public static void main(String[] args) throws IOException { 
     JobConf conf = new JobConf(ProductMapReduce.class); 
     conf.setJobName("Product Analyzer"); 

     conf.setOutputKeyClass(Text.class); 
     conf.setOutputValueClass(LongWritable.class); 

     conf.setMapperClass(Map.class); 
     conf.setCombinerClass(Reduce.class); 
     conf.setReducerClass(Reduce.class); 

     conf.setInputFormat(TextInputFormat.class); 
     conf.setOutputFormat(TextOutputFormat.class); 

     FileInputFormat.setInputPaths(conf, new Path(args[0])); 
     FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

     JobClient.runJob(conf); 
    } 
} 

LABEL 1:地圖後輸出減少低於:

PRODUCT-1 5 
PRODUCT-2 3 
PRODUCT-3 1 

這裏是一個問題:

我在HBase中有一個表,它具有以下信息如下。

PRODUCT-1 10$ 
PRODUCT-2 20$ 
PRODUCT-3 30$ 

問題/需求:我想要的降低相的輸出作爲減少輸出的合併在「LABEL 1:」及以上

HBase的表中規定
PRODUCT-1 10$ * 5 = 50$ 
PRODUCT-2 20$ * 3 = 60$ 
PRODUCT-3 30$ * 1 = 30$ 

基本上,密鑰是PRODUCT-1,該密鑰的值爲10 $,同一密鑰的值爲5,兩個值相乘。 ($符號是爲了解)

注:我發現的例子是基於輸入或輸出到HBase。我的情況是,輸入和輸出將是HDFS中的文件,而我需要使用HBase表中的信息處理reducer輸出。

回答

1

由於HBase支持高讀取吞吐量,並且您只想讀取reducer中的數據(將使用它們的受控數目): 您可以使用HBase API根據reducer的關鍵字從表中讀取數據。由於Hbase中的讀取速度很快(取決於所讀取數據的大小,約10ms),我認爲您的性能不會受到影響。 只需確保在reducer的configure()方法中初始化配置& HTable。

0

這是我做的,

我減速類中,我加入了重載方法「設置」

private HTable htable; 

private Configuration config; 

protected void setup(Context context) throws IOException, InterruptedException { 
    Configuration config = HBaseConfiguration.create(); 
    config.addResource(new Path("/etc/hbase/conf.hbase1/hbase-site.xml")); 
    try { 
     htable = new HTable(config, "MY_TABLE"); 
    } 
    catch (IOException e) { 
     System.out.println("Error getting table from HBase", e); 
    } 

} 

使用HTable.get API,我得到的結果對象。

+0

在你的reducer中,你擴展了TableReducer類嗎? – Shash

+0

@shash,我在我的reduce實現中擴展了'org.apache.hadoop.mapreduce.Reducer'類。 –