2016-04-10 42 views
2

我有一個POJO類EMP的象下面這樣: 我能夠讀出流數據,我想將數據插入到HBase的星火插入到HBase的

@JsonInclude(Include.NON_NULL) 
public class empData implements Serializable { 

    private String id; 
    private String name; 

    @Override 
    public String toString() { 
     return "id=" + id + ", name="+ name ; 
    } 
    public String id() { 
     return id; 
    } 
    public void setId(String id) { 
     this.id = id; 
    } 
    public String getName() { 
     return name; 
    } 
    public void setName(String name) { 
     this.name = name; 
    } 

} 

下面是火花代碼:

empRecords.foreachRDD(new Function<JavaRDD<empData>, Void>() { 

      private static final long serialVersionUID = 1L; 

      @Override 
      public Void call(JavaRDD<empData> empDataEvent)throws Exception {  

       Configuration conf = HBaseConfiguration.create(); 
       Configuration config = null; 
       config = HBaseConfiguration.create(); 
       config.set("hbase.zookeeper.quorum", "**********); 
       HBaseAdmin.checkHBaseAvailable(config); 
       config.set(TableInputFormat.INPUT_TABLE, "tableName"); 
       Job newAPIJobConfiguration1 = Job.getInstance(config); 
       newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "empHbase"); 
       newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class);   
       JavaPairRDD<ImmutableBytesWritable, Put> inesrts = empData.mapToPair(new PairFunction<Row, ImmutableBytesWritable, Put>() { 

          public Tuple2<ImmutableBytesWritable, Put> call(Row row) throws Exception 

          { 
           Put put = new Put(Bytes.toBytes(row.getString(0))); 
           put.add(Bytes.toBytes("empA"),Bytes.toBytes("id"),Bytes.toBytes(row.getString(1))); 
           put.add(Bytes.toBytes("empA"),Bytes.toBytes("name"),Bytes.toBytes(row.getString(2))); 
           return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put); 
          } 
           }); 

          inserts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration()); 
             } 
     }); 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 

在代碼的問題是這一步:

JavaPairRDD<ImmutableBytesWritable, Put> inesrts =empDataEvent.mapToPair(new PairFunction<Row, ImmutableBytesWritable, Put>() 

如何使用empDataEvent以及如何插入.. 如何插入mapToPair empDataEvent類對象,以便我可以插入到Hbase中。 讚賞任何幫助..

回答

0

阿曼,

在你的代碼有參考「行」,你可以請詳細說明它是從哪裏來的?因爲沒有參考。

查看下面更新的代碼,使用類名「empData」而不是「Row」對象。

JavaPairRDD<ImmutableBytesWritable, Put> inesrts = empData.mapToPair(new PairFunction<empData, ImmutableBytesWritable, Put>() { 

         public Tuple2<ImmutableBytesWritable, Put> call(empData row) throws Exception 
         { 
          Put put = new Put(Bytes.toBytes(row.id)); 
          put.add(Bytes.toBytes("empA"),Bytes.toBytes("id"),Bytes.toBytes(row.id)); 
          put.add(Bytes.toBytes("empA"),Bytes.toBytes("name"),Bytes.toBytes(row.getName)); 
          return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put); 
         } 
          });