2012-11-16 21 views
0

我想在不使用任何減速器的情況下讀寫hbase。讀/寫沒有減速器的Hbase,例外

我遵循「Apache HBase™參考指南」中的示例,但也有例外。

這裏是我的代碼:

public class CreateHbaseIndex { 
static final String SRCTABLENAME="sourceTable"; 
static final String SRCCOLFAMILY="info"; 
static final String SRCCOL1="name"; 
static final String SRCCOL2="email"; 
static final String SRCCOL3="power"; 

static final String DSTTABLENAME="dstTable"; 
static final String DSTCOLNAME="index"; 
static final String DSTCOL1="key"; 
public static void main(String[] args) { 
    System.out.println("CreateHbaseIndex Program starts!..."); 
    try { 
     Configuration config = HBaseConfiguration.create(); 
     Scan scan = new Scan(); 
     scan.setCaching(500); 
     scan.setCacheBlocks(false); 
     scan.addColumn(Bytes.toBytes(SRCCOLFAMILY), Bytes.toBytes(SRCCOL1));//info:name 
     HBaseAdmin admin = new HBaseAdmin(config); 
     if (admin.tableExists(DSTTABLENAME)) { 
      System.out.println("table Exists."); 
     } 
     else{ 
      HTableDescriptor tableDesc = new HTableDescriptor(DSTTABLENAME); 
      tableDesc.addFamily(new HColumnDescriptor(DSTCOLNAME)); 
      admin.createTable(tableDesc); 
      System.out.println("create table ok."); 
     } 
     Job job = new Job(config, "CreateHbaseIndex"); 
     job.setJarByClass(CreateHbaseIndex.class); 
     TableMapReduceUtil.initTableMapperJob(
       SRCTABLENAME, // input HBase table name 
       scan, // Scan instance to control CF and attribute selection 
       HbaseMapper.class, // mapper 
       ImmutableBytesWritable.class, // mapper output key 
       Put.class, // mapper output value 
       job); 
     job.waitForCompletion(true); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } catch (ClassNotFoundException e) { 
     e.printStackTrace(); 
    } 
    System.out.println("Program ends!..."); 
} 

public static class HbaseMapper extends TableMapper<ImmutableBytesWritable, Put> { 
    private HTable dstHt; 
    private Configuration dstConfig; 
    @Override 
    public void setup(Context context) throws IOException{ 
     dstConfig=HBaseConfiguration.create(); 
     dstHt = new HTable(dstConfig,SRCTABLENAME); 
    } 

    @Override 
    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { 
     // this is just copying the data from the source table... 
     context.write(row, resultToPut(row,value)); 
    } 

    private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException { 
     Put put = new Put(key.get()); 
     for (KeyValue kv : result.raw()) { 
      put.add(kv); 
     } 
     return put; 
    } 

    @Override 
    protected void cleanup(Context context) throws IOException, InterruptedException { 
     dstHt.close(); 
     super.cleanup(context); 
    } 
} 
} 

順便說一句, 「souceTable」 是這樣的:

key name email 
1 peter [email protected] 
2 sam  [email protected] 

「dstTable」 將是這樣的:

key value 
peter 1 
sam 2 

我在這個領域是一個新手,需要你的幫助。 Thx〜

+0

減速機是你寫東西的地方。你爲什麼不想在reducer中寫入hbase? –

+0

@ChrisGerken我認爲mapper可以完成我需要的所有事情。 – zhoutall

回答

0

你是對的,你不需要reducer來寫入HBase,但是有一些reducer可能會有所幫助。如果您正在創建索引,則可能遇到兩個映射器試圖寫入同一行的情況。除非您小心確保它們正在寫入不同的列限定符,否則由於競爭條件可能會覆蓋另一個更新。儘管HBase確實會進行行級鎖定,但如果您的應用程序邏輯出錯,它將無濟於事。

在沒有看到您的例外的情況下,我猜測您失敗了,因爲您正在嘗試將源表中的鍵值對寫入索引表(其中列族不存在)。

0

在此代碼中,您未指定輸出格式。您需要添加以下代碼

job.setOutputFormatClass(TableOutputFormat.class); 

    job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, 
      DSTTABLENAME); 

而且,我們不應該在創建新的配置設置,我們需要使用相同的配置,從上下文。