2017-03-03 128 views
2

acutally我在MapReduce和Bulkload的幫助下將數據加載到Hbase中,這是我在Java中實現的。 所以基本上我創建了一個映射器並使用HFileOutputFormat2.configureIncrementalLoad(完整代碼在問題的末尾)進行約簡,我使用一個映射器,它只是從文件中讀取一些字節並創建一個put。寫出這個使用LoadIncrementalHFiles.doBulkLoad寫入數據到Hbase。這一切都很好。但是肯定的時候,它會覆蓋Hbase中的舊值。所以我正在尋找一種方法來追加數據,就像api工程中的追加函數一樣。 感謝您的閱讀,希望你們當中有些人有一個想法,可以幫助我:)Hbase Bulkload追加數據而不是覆蓋它們

public int run(String[] args) throws Exception { 
    int result=0; 
    String outputPath = args[1]; 
    Configuration configuration = getConf(); 
    configuration.set("data.seperator", DATA_SEPERATOR); 
    configuration.set("hbase.table.name",TABLE_NAME); 
    configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1); 
    configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2); 

    Job job = Job.getInstance(configuration); 
    job.setJarByClass(HBaseBulkLoadDriver.class); 
    job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
    job.setMapperClass(HBaseBulkLoadMapper.class); 

    FileInputFormat.addInputPaths(job, args[0]); 
    FileSystem.getLocal(getConf()).delete(new Path(outputPath), true); 
    HFileOutputFormat2.setOutputPath(job,new Path((outputPath))); 
    job.setMapOutputValueClass(Put.class); 
    Connection c = ConnectionFactory.createConnection(configuration); 
    Table t = c.getTable(TableName.valueOf(TABLE_NAME)); 
    RegionLocator rl = c.getRegionLocator(TableName.valueOf(TABLE_NAME)); 
    HFileOutputFormat2.configureIncrementalLoad(job,t,rl); 
    System.out.println("start"); 
    job.waitForCompletion(true); 
    if (job.isSuccessful()) { 
     HBaseBulkLoad.doBulkLoad(outputPath, TABLE_NAME); 
    } else { 

     result = -1; 
    } 
    return result; 
} 



public static void doBulkLoad(String pathToHFile, String tableName) { 
    try { 
     Configuration configuration = new Configuration(); 
     configuration.set("mapreduce.child.java.opts", "-Xmx1g"); 
     HBaseConfiguration.addHbaseResources(configuration); 
     LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration); 


     //HTable hTable = new HTable(configuration, tableName); 
     //loadFfiles.doBulkLoad(new Path(pathToHFile), hTable); 

     Connection connection = ConnectionFactory.createConnection(configuration); 
     Table table = connection.getTable(TableName.valueOf(tableName)); 
     Admin admin = connection.getAdmin(); 
     RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName)); 
     //path, admin, table, region locator 
     loadFfiles.doBulkLoad(new Path(pathToHFile),admin,table,regionLocator); 


     System.out.println("Bulk Load Completed.."); 
    } catch(Exception exception) { 
     exception.printStackTrace(); 
    } 

正如意見中的要求,我在這裏添加表描述的輸出,導致表被蟒蛇happybase創建API和我'不知道什麼optionflags的API可以由默認設置...

{NAME => '0', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B LOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false', BLO CKSIZE => '65536', REPLICATION_SCOPE => '0'}
{NAME => '1', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B LOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false', BLO CKSIZE => '65536', REPLICATION_SCOPE => '0'}

+1

HBase批量加載默認附加數據,如果您將表和列族配置爲只存儲一行版本,則不會擦除舊日期,除非此情況。你可以添加到帖子你是如何創建你的表? – maxteneff

+0

嘿,我用Happybase api創建了表格,所以添加了表格描述...當我嘗試我的源代碼時,將相同組合的rowkey,family和列描述符放在兩個不同的值中,然後從此檢索列我只獲得最後一個值。但是,如果第一個放入字符串Value1,第二個放入Value2,我想要有像「Value1Value2」這樣的東西。 – Pils19

+1

您是如何檢查兩個鍵後只有一個版本的行的?如果您在兩次單獨的批量加載過程中嘗試插入兩個不同的密鑰,會發生什麼? – maxteneff

回答

1

在HFileOutputFormat2.configureIncrementalLoad()http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.html#line.408 PutSortReducer用作還原劑。

PutSortReducer.reduce()http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/PutSortReducer.html KeyValues存儲在TreeSet中,僅使用比較器比較鍵。這就是爲什麼只有一個價值存活。

要保留2個值,您可以創建基於PutSortReducer的自己的reducer,您可以在其中保留2個值。並設置它:

HFileOutputFormat2.configureIncrementalLoad(job,t,rl); job.setReducerClass(MyReducer.class);

+0

是的創建自定義縮減器可能工作,所以你不會覆蓋文件具有相同的密鑰這種bulkload,但它不能解決其他問題,我想將文件中的數據添加到已創建新版本的數據中。 – Pils19

+1

覆蓋現有版本是一種普遍的HBase行爲,不是特定的bulkload。爲了解決這個問題,在自定義reducer中,您可以讀取HBase中的數據,並添加新的值。 –

+0

是的,我知道,這是一個普遍的行爲,但我想找到一個像append函數一樣讀取數據的地區服務器和concat從輸入和舊字節的方式... – Pils19

相關問題