2017-09-25 61 views
0

我編寫了一個Spark應用程序,該應用程序稍後將生成用於批量加載的HFiles,並使用LoadIncrementalHFiles命令。由於源數據池非常大,所以輸入文件被分割成迭代,依次處理。每次迭代創建自己的目錄HFile,所以我的HDFS結構是這樣的:使用LoadIncrementalHFiles和子目錄批量加載

/user/myuser/map_data/hfiles_0 
     ...   /hfiles_1 
     ...   /hfiles_2 
     ...   /hfiles_3 
        ... 

有此map_data目錄中大約500文件,所以我在尋找一種方法來自動調用LoadIncrementalHFiles功能,來處理這些子目錄也在迭代中。

相應的命令是這樣的:

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /user/myuser/map_data/hfiles_0 mytable 

我需要變成一個迭代的命令這一點,因爲該命令不與子目錄工作(當我把它與/user/myuser/map_data目錄)!

我嘗試使用Java Process實例自動執行上面的命令,但是這看起來沒有做任何事情(沒有輸出到控制檯,也沒有更多的行在我的HBase表中)。

使用org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles我的代碼中的Java類也不起作用,它也沒有響應!

有沒有人爲我工作的例子?或者是否有一個參數能夠在父目錄上運行上述hbase命令?我正在Hortonworks Data Platform 2.5集羣中使用HBase 1.1.2。

編輯我試圖運行從一個Hadoop客戶端Java應用程序的LoadIncrementalHFiles命令,但我發現與活潑的壓縮異常,請參閱Run LoadIncrementalHFiles from Java client

回答

0

的解決方案是拆分hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /user/myuser/map_data/hfiles_0 mytable命令成許多部分(每命令部分一個),請參閱此Java代碼片段:

TreeSet<String> subDirs = getHFileDirectories(new Path(HDFS_PATH), hadoopConf); 

for(String hFileDir : subDirs) { 

    try { 
     String pathToReadFrom = HDFS_OUTPUT_PATH + "/" + hFileDir; 
     ==> String[] execCode = {"hbase", "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles", "-Dcreate.table=no", pathToReadFrom, hbaseTableName}; 
     ProcessBuilder pb = new ProcessBuilder(execCode); 
     pb.redirectErrorStream(true); 
     final Process p = pb.start(); 

     // Write the output of the Process to the console 
     new Thread(new Runnable() { 
      public void run() { 
       BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); 
       String line = null; 

       try { 
        while ((line = input.readLine()) != null) 
         System.out.println(line); 
       } catch (IOException e) { 
        e.printStackTrace(); 
       } 
      } 
     }).start(); 

    // Wait for the end of the execution 
    p.waitFor(); 
    ... 
}