1
我已經編寫了一個map reduce作業來讀取文件中的數據並將其插入到Hbase表中。但是我面臨的問題是隻有1條記錄被插入到Hbase表中。我不確定這是最後一個記錄還是任何隨機記錄,因爲我的輸入文件大約是10Gb。我寫的邏輯,我確信記錄應該在表中插入數千個。我只分擔減速代碼和驅動程序類代碼,我敢肯定,問題的關鍵在於找到here.Please下面的代碼:無法使用MapReduce將數據插入到Hbase表中
public static class Reduce extends TableReducer<Text,Text,ImmutableBytesWritable> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Set<Text> uniques = new HashSet<Text>();
String vis=key.toString();
String[] arr=vis.split(":");
Put put=null;
for (Text val : values){
if (uniques.add(val)) {
put = new Put(arr[0].getBytes());
put.add(Bytes.toBytes("cf"), Bytes.toBytes("column"),Bytes.toBytes(val.toString()));
}
context.write(new ImmutableBytesWritable(arr[0].getBytes()), put);
}
}
}
我Driver類:
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "Blank");
job.setJarByClass(Class_name.class);
job.setMapperClass(Map.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setSortComparatorClass(CompositeKeyComprator.class);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
job.setReducerClass(Reduce.class);
TableMapReduceUtil.initTableReducerJob(
"Table_name",
Reduce.class,
job);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
運行程序後在控制檯中,它表示Reduce輸出記錄= 73579,但在表中只插入了1條記錄。
15/06/19 16:32:41 INFO mapred.JobClient: Job complete: job_201506181703_0020
15/06/19 16:32:41 INFO mapred.JobClient: Counters: 28
15/06/19 16:32:41 INFO mapred.JobClient: Map-Reduce Framework
15/06/19 16:32:41 INFO mapred.JobClient: Spilled Records=147158
15/06/19 16:32:41 INFO mapred.JobClient: Map output materialized bytes=6941462
15/06/19 16:32:41 INFO mapred.JobClient: Reduce input records=73579
15/06/19 16:32:41 INFO mapred.JobClient: Virtual memory (bytes) snapshot=7614308352
15/06/19 16:32:41 INFO mapred.JobClient: Map input records=140543
15/06/19 16:32:41 INFO mapred.JobClient: SPLIT_RAW_BYTES=417
15/06/19 16:32:41 INFO mapred.JobClient: Map output bytes=6794286
15/06/19 16:32:41 INFO mapred.JobClient: Reduce shuffle bytes=6941462
15/06/19 16:32:41 INFO mapred.JobClient: Physical memory (bytes) snapshot=892702720
15/06/19 16:32:41 INFO mapred.JobClient: Reduce input groups=1
15/06/19 16:32:41 INFO mapred.JobClient: Combine output records=0
15/06/19 16:32:41 INFO mapred.JobClient: Reduce output records=73579
15/06/19 16:32:41 INFO mapred.JobClient: Map output records=73579
15/06/19 16:32:41 INFO mapred.JobClient: Combine input records=0
15/06/19 16:32:41 INFO mapred.JobClient: CPU time spent (ms)=10970
15/06/19 16:32:41 INFO mapred.JobClient: Total committed heap usage (bytes)=829947904
15/06/19 16:32:41 INFO mapred.JobClient: File Input Format Counters
15/06/19 16:32:41 INFO mapred.JobClient: Bytes Read=204120920
15/06/19 16:32:41 INFO mapred.JobClient: FileSystemCounters
15/06/19 16:32:41 INFO mapred.JobClient: HDFS_BYTES_READ=204121337
15/06/19 16:32:41 INFO mapred.JobClient: FILE_BYTES_WRITTEN=14198205
15/06/19 16:32:41 INFO mapred.JobClient: FILE_BYTES_READ=6941450
15/06/19 16:32:41 INFO mapred.JobClient: Job Counters
當我寫的減速機的輸出到一個文件,我得到正確的output.But不是在HBase的表。 請讓我知道我在這裏失蹤。提前致謝。
那麼我該如何修改代碼呢? – Shash
這裏改變put = new Put(arr [0] .getBytes());並修改arr [i],採取動態而不是0總是 – Ramzy
它完全取決於你的設計。有兩種可能的方法: 1)將增量索引(i ++)附加到行鍵,以便減速器中的每個值都將轉到新的行鍵。 2)將增量索引(i ++)附加到列限定符名稱,以便每個值將以不同的限定符名稱存儲在同一行鍵下。 選擇完全取決於您的要求。 –