可以像下面的例子一樣使用Mapreduce和MultipleTableOutputFormat。
但在下面的例子中,我從文件即TextInputFormat
閱讀相反,你必須使用TableInputFormat
'all'
,而不是表1表2從HBase的表中讀取它...你必須使用'animal', 'planet', 'human'
根據您的要求,如果您在Hbase表上掃描並使用表格InputFormat將其傳遞給Mapper,則您還將獲得rowkey以及Mapper的映射方法。這需要比較以決定要插入哪個表。
Please see 7.2.2. HBase MapReduce Read/Write Example
package mapred;
import java.io.IOException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.hbase.client.Put;
public class MultiTableMapper {
static class InnerMapper extends Mapper <LongWritable, Text, ImmutableBytesWritable, Put> {
public void map(LongWritable offset, Text value, Context context) throws IOException {
// contains the line of tab separated data we are working on (needs to be parsed out).
//byte[] lineBytes = value.getBytes();
String valuestring[]=value.toString().split(「\t」);
String rowid = /*HBaseManager.generateID();*/ 「12345」;
// rowKey is the hbase rowKey generated from lineBytes
Put put = new Put(rowid.getBytes());
put.add(Bytes.toBytes(「UserInfo」), Bytes.toBytes(「StudentName」), Bytes.toBytes(valuestring[0]));
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(「Table1」)), put);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} // write to the actions table
// rowKey2 is the hbase rowKey
Put put1 = new Put(rowid.getBytes());
put1.add(Bytes.toBytes(「MarksInfo」),Bytes.toBytes(「Marks」),Bytes.toBytes(valuestring[1]));
// Create your KeyValue object
//put.add(kv);
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(「Table2」)), put1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} // write to the actions table
}
}
public static void createSubmittableJob() throws IOException, ClassNotFoundException, InterruptedException {
Path inputDir = new Path(「in」);
Configuration conf = /*HBaseManager.getHBConnection();*/ new Configuration();
Job job = new Job(conf, 「my_custom_job」);
job.setJarByClass(InnerMapper.class);
FileInputFormat.setInputPaths(job, inputDir);
job.setMapperClass(InnerMapper.class);
job.setInputFormatClass(TextInputFormat.class);
// this is the key to writing to multiple tables in hbase
job.setOutputFormatClass(MultiTableOutputFormat.class);
//job.setNumReduceTasks(0);
//TableMapReduceUtil.addDependencyJars(job);
//TableMapReduceUtil.addDependencyJars(job.getConfiguration());
System.out.println(job.waitForCompletion(true));
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
MultiTableMapper.createSubmittableJob();
System.out.println();
}
}
MultipleTableOutputFormat可以塞弗這個目的。請看我的答案。 –