2016-03-16 87 views
0

我試圖自定義批量加載map-reduce進入HBase,並遇到reducer問題。首先,我認爲我沒有很好地編寫reducer,但是當在reducer中拋出運行時異常並看到代碼正常工作時,我意識到reducer沒有運行。 到目前爲止,對於這個問題的一些常見問題,我沒有看到任何問題;Hadoop mapreduce - reducer未運行

  1. 我的配置已經mapoutput和輸出單獨
  2. 我減速器和映射器已覆蓋
  3. 我可迭代的,我減速器輸入(可寫,放),所以...

這裏是我的代碼:

驅動

public int run(String[] args) throws Exception { 
    int result=0; 
    String outputPath = args[1]; 
    Configuration configuration = getConf(); 
    configuration.set("data.seperator", DATA_SEPERATOR); 
    configuration.set("hbase.table.name",TABLE_NAME); 
    configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1); 
    Job job = new Job(configuration); 
    job.setJarByClass(HBaseBulkLoadDriver.class); 
    job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
    job.setMapperClass(HBaseBulkLoadMapper.class); 
    job.setReducerClass(HBaseBulkLoadReducer.class); 
    job.setOutputKeyClass(ImmutableBytesWritable.class); 
    job.setOutputValueClass(Put.class); 
    FileInputFormat.addInputPaths(job, args[0]); 
    FileSystem.getLocal(getConf()).delete(new Path(outputPath), true); 
    FileOutputFormat.setOutputPath(job, new Path(outputPath)); 
    job.setMapOutputValueClass(Put.class); 
    job.setNumReduceTasks(1); 
    HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME)); 
    job.waitForCompletion(true); 

映射

public class HBaseBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { 
    private String hbaseTable; 
    private String dataSeperator; 
    private String columnFamily1; 
    private ImmutableBytesWritable hbaseTableName; 

    public void setup(Context context) { 
     Configuration configuration = context.getConfiguration(); 
     hbaseTable = configuration.get("hbase.table.name"); 
     dataSeperator = configuration.get("data.seperator"); 
     columnFamily1 = configuration.get("COLUMN_FAMILY_1"); 
     hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(hbaseTable)); 
    } 
     @Override 
    public void map(LongWritable key, Text value, Context context) { 
     try { 
      String[] values = value.toString().split(dataSeperator); 
      String rowKey = values[0]; 
      Put put = new Put(Bytes.toBytes(rowKey)); 
      BUNCH OF ADDS; 
      context.write(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), put); 
     } catch(Exception exception) { 
      exception.printStackTrace(); 
     } 
    } 
} 

減速

public class HBaseBulkLoadReducer extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Put> { 
     @Override 
     protected void reduce(
      ImmutableBytesWritable row, 
      Iterable<Put> puts, 
      Reducer<ImmutableBytesWritable, Put, 
        ImmutableBytesWritable, Put>.Context context) 
      throws java.io.IOException, InterruptedException 
     { 
     TreeMap<String,KeyValue> map = new TreeMap<String,KeyValue>(); 
     int count =0; 
     Append nkv; 
     byte[] tmp= "".getBytes(); 
     Put pp = new Put(tmp); 
    try{ 
     for (Put p : puts) { 
       byte[] r = "".getBytes(); 
       //KeyValue kv = new KeyValue(r); 
       if (count!=0){ 
       r = p.getRow(); 
       pp.add(new KeyValue(r)); 
       //KeyValue k = map.get(row.toString()); 
       //nkv = new Append(k.getRowArray()); 
       //nkv=nkv.add(kv); 
       //map.put(row.toString(), k.clone()); 
       //context.write(row,nkv); 
       //tmp=ArrayUtils.addAll(tmp,kv.getValueArray()); 
       //map.put(row.toString(),new KeyValue(kv.getRowArray(),kv.getFamilyArray(),kv.getQualifierArray(),tmp)); 
       count++; 
       throw new RuntimeException(); 
       } 
       else{ 
       r = p.getRow(); 
       pp = new Put(row.toString().getBytes()); 
       pp.add(new KeyValue(r)); 
       //tmp=kv.clone().getValueArray(); 
       //nkv = new Append(kv.getRowArray()); 
       //map.put(row.toString(), kv.clone()); 
       count++; 
       throw new RuntimeException(); 
      } 
    } 
     context.write(row,pp); 
     }catch(Exception e) { e.printStackTrace();} 
    } 

} 

嗯,我知道減速機是有點混亂,但事實是,它的RuntimeException兩個if和else子句,你可以看到和批量加載成功,所以我很確定減速機沒有運轉 - 我不知道爲什麼。所有三個文件都是maven打包在同一個目錄下,FYI。

+0

爲什麼你含蓄'拋出新的RuntimeException();'? –

+0

他試圖查看塊是否執行......「但是當在reducer中拋出運行異常並看到代碼正常工作時,我意識到reducer沒有運行」 – Tgsmith61591

+0

我認爲reducer應該基於'job .setNumReduceTasks(1)'但如果'Iterable puts'爲空,那麼reducer的for循環將不會被輸入,並且這些異常將不會被拋出 –

回答

0

找出錯在哪裏。 configureincrementalload根據輸出值將reducer類設置爲putsort或keyvaluesort,所以如果我想使用自定義reducer類,則必須在configureincrementalload之後設置它。之後,我可以看到減速器在運轉。只是回答我自己的問題,以便它可以幫助遇到同樣問題的人。

HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME)); 
job.setReducerClass(HBaseBulkLoadReducer.class); 
job.waitForCompletion(true);