2017-07-10 89 views
0

我希望有人能告訴我這兩個API調用之間的區別是什麼。我在他們兩個之間得到了奇怪的結果。這發生在hbase-client/hbase-server版本1.0.1和1.2.0-cdh5.7.2中。在Hbase中,ResultScanner和initTableMapperJob之間的掃描有什麼區別

首先,我的行鍵格式爲hash_name_timestamp 100_servername_1234567890。 hbase表的TTL爲30天,因此壓縮後超過30天的情況應該消失。

以下是使用ResultScanner的代碼。它不使用MapReduce,因此需要很長時間才能完成。我無法以這種方式運行我的工作,因爲它需要很長時間。但是,出於調試目的,我沒有任何問題,這種方法。它列出了所有鍵指定的時間範圍內,這看起來像是有效到我,因爲返回鍵的所有時間戳在過去30天,並在規定的時間範圍內:

Scan scan = new Scan(); 
scan.addColumn(Bytes.toBytes("raw_data"), Bytes.toBytes(fileType)); 
scan.setCaching(500); 
scan.setCacheBlocks(false); 
scan.setTimeRange(start, end); 

Connection fConnection = ConnectionFactory.createConnection(conf); 
Table table = fConnection.getTable(TableName.valueOf(tableName)); 
ResultScanner scanner = table.getScanner(scan); 
for (Result result = scanner.next(); result != null; result = scanner.next()) { 
    System.out.println("Found row: " + Bytes.toString(result.getRow())); 
} 

後續的代碼不能正常工作但它使用MapReduce,它的運行速度比使用ResultScanner的方式要快,因爲它將事件分成1200個地圖。問題是我理解的應該已經消失,由於TTL到期rowkeys:

Scan scan = new Scan(); 
scan.addColumn(Bytes.toBytes("raw_data"), Bytes.toBytes(fileType)); 
scan.setCaching(500); 
scan.setCacheBlocks(false); 
scan.setTimeRange(start, end); 
TableMapReduceUtil.initTableMapperJob(tableName, scan, MTTRMapper.class, Text.class, IntWritable.class, job); 

這裏是我的錯誤,後來這最終殺死整個MR工作,因爲映射器的25%以上失敗。

Error: org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions: Wed Jun 28 13:46:57 PDT 2017, null, java.net.SocketTimeoutException: callTimeout=120000, callDuration=120301: row '65_app129041.iad1.mydomain.com_1476641940' on table 'server_based_data' at region=server_based_data

我會努力學習的HBase的客戶端和HBase的服務器罐子的代碼,但希望有人會副手知道方法之間的區別是什麼,是什麼原因造成的initTableMapperJob調用失敗。

編輯:這是我使用的表的描述:

describe 'server_based_data' 
Table server_based_data is ENABLED            
server_based_data                
COLUMN FAMILIES DESCRIPTION              
{NAME => 'raw_data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLIC 
ATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'SNAPPY', MIN_VERSIONS => '0 
', TTL => '2592000 SECONDS (30 DAYS)', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE 
=> '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}       
1 row(s) in 0.5180 seconds 

這裏是我的映射器代碼:

public void map(ImmutableBytesWritable rowkey, Result columns, Context context) throws IOException, InterruptedException { 
    Configuration conf = context.getConfiguration(); 
    startMS = conf.getLong("startTime", 0); 
    endMS = conf.getLong("endTime", 1); 
    System.out.println(startMS); 
    System.out.println(endMS); 

    // extract the ci as the key from the rowkey 
    String pattern = "\\d*_(\\S*)_(\\d{10})"; 
    String ciname = null; 
    Pattern r = Pattern.compile(pattern); 
    String strRowKey = Bytes.toString(rowkey.get()); 
    // check the time here to see if we count it or not in the counts 

    Matcher m = r.matcher(strRowKey); 
    long ts = 0; 

    if (m.find()) { 
     ts = Long.valueOf(m.group(2)).longValue(); 
     ciname = m.group(1); 
     if ((ts >= startMS) && (ts <= endMS)) { 
      context.write(new Text(ciname), ONE);   
     }   
    }  
} 

我還是覺得有什麼東西與initTableMapperJob方法,因爲我上面發佈的錯誤顯示了一個應該從表的TTL過期的行的時間戳,但由於某種原因,initTableMapperJob仍然會找到它並嘗試查找它,但超時,而ResultScanner由於某種原因沒有看到它。

+0

能否請您提供更多關於您地圖縮小作業的細節並描述Hbase表格。如果你有興趣,我可以建議你在Hbase上使用Spark。 – gorros

+0

@gorros,謝謝你的幫助。是的,如果你有火花的建議,我會聽。希望這很容易實現,我很聰明,能夠理解你。我還根據您的要求更新了一些更詳細的問題。 – Classified

回答

1

我想提出幾點建議。

  1. 看看Spark on HBase examples,特別是如何用Spark執行掃描。它是用Scala編寫的,但是你可以在Java中實現。即使在這種情況下,它也會比MapReduce代碼更加簡潔。
  2. 如果您只需要行鍵,請添加相應的過濾器,如FirstKeyOnlyFilter。這將減少從HBase檢索到的不必要數據量。

我不知道什麼導致了initTableMapperJob的奇怪行爲。但我希望以上建議將會有用。

相關問題