5
我需要從文本文件加載數據到Map Reduce,我從很多天開始瀏覽,但是我沒有爲我的工作找到任何正確的解決方案。是否有任何方法或類從系統讀取文本/ csv文件並將數據存儲到HBASE表中。它對我來說真的非常緊迫,任何人都可以幫助我瞭解MapReduce F/w。從系統讀取文本文件到Hbase MapReduce
我需要從文本文件加載數據到Map Reduce,我從很多天開始瀏覽,但是我沒有爲我的工作找到任何正確的解決方案。是否有任何方法或類從系統讀取文本/ csv文件並將數據存儲到HBASE表中。它對我來說真的非常緊迫,任何人都可以幫助我瞭解MapReduce F/w。從系統讀取文本文件到Hbase MapReduce
對於從文本文件中讀取首先文本文件應該在hdfs中。 您需要指定作業輸入格式和OUTPUTFORMAT
Job job = new Job(conf, "example");
FileInputFormat.addInputPath(job, new Path("PATH to text file"));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(YourMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TableMapReduceUtil.initTableReducerJob("hbase_table_name", YourReducer.class, job);
job.waitForCompletion(true);
YourReducer
應該延伸org.apache.hadoop.hbase.mapreduce.TableReducer<Text, Text, Text>
樣品減速碼
public class YourReducer extends TableReducer<Text, Text, Text> {
private byte[] rawUpdateColumnFamily = Bytes.toBytes("colName");
/**
* Called once at the beginning of the task.
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// something that need to be done at start of reducer
}
@Override
public void reduce(Text keyin, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// aggregate counts
int valuesCount = 0;
for (Text val : values) {
valuesCount += 1;
// put date in table
Put put = new Put(keyin.toString().getBytes());
long explicitTimeInMs = new Date().getTime();
put.add(rawUpdateColumnFamily, Bytes.toBytes("colName"), explicitTimeInMs,val.toString().getBytes());
context.write(keyin, put);
}
}
}
樣品映射類
public static class YourMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}