2017-02-01 55 views
0

檢索從HBase的表中的數據我有兩個HBase的表「hbaseTable」,「hbaseTable1」和蜂巢表「hiveTable」 我的查詢是這樣的:我如何做一個內部聯接,並把它變成蜂巢

'insert overwrite hiveTable select col1, h2.col2, col3 from hbaseTable h1,hbaseTable2 h2 where h1.col=h2.col2'; 

我需要在hbase中進行內部連接並將數據帶入配置單元。我們正在使用java的配置單元,這給了很差的性能。 因此計劃通過使用spark來改變方法。即使用java的火花 如何使用SPARK從我的JAVA代碼連接到hbase。

現在我的Spark代碼應該在hbase中進行連接,並通過上述查詢將數據引入配置單元。

請提供示例代碼。

回答

1

如果您使用spark來加載hbase數據,那麼爲什麼要將它加載到配置單元中? 你可以使用類似於配置單元和sql的spark sql。 您可以根本不使用配置單元查詢數據。 例如:

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.SQLContext; 
import scala.Tuple2; 
import java.util.Arrays; 

public class SparkHbaseHive { 
    public static void main(String[] args) { 
     Configuration conf = HBaseConfiguration.create(); 
     conf.set(TableInputFormat.INPUT_TABLE, "test"); 
     JavaSparkContext jsc = new JavaSparkContext(new SparkConf().setAppName("Spark-Hbase").setMaster("local[3]")); 
     JavaPairRDD<ImmutableBytesWritable, Result> source = jsc 
       .newAPIHadoopRDD(conf, TableInputFormat.class, 
         ImmutableBytesWritable.class, Result.class); 
     SQLContext sqlContext = new SQLContext(jsc); 
     JavaRDD<Table1Bean> rowJavaRDD = 

source.map((Function<Tuple2<ImmutableBytesWritable, Result>, Table1Bean>) object -> { 
      Table1Bean table1Bean = new Table1Bean(); 
      table1Bean.setRowKey(Bytes.toString(object._1().get())); 


table1Bean.setColumn1(Bytes.toString(object._2().getValue(Bytes.toBytes("colfam1"), Bytes.toBytes("col1")))); 
      return table1Bean; 
    }); 
     DataFrame df = sqlContext.createDataFrame(rowJavaRDD, Table1Bean.class); 

     //similarly create df2 
     //use df.join() and then register as joinedtable or register two tables and join 
     //execute sql queries 

     //Example of sql query on df 
     df.registerTempTable("table1"); 
     Arrays.stream(sqlContext.sql("select * from table1").collect()).forEach(row -> System.out.println(row.getString(0) + "," + row.getString(1))); 

    } 
} 
public class Table1Bean { 
    private String rowKey; 
    private String column1; 


    public String getRowKey() { 
     return rowKey; 
    } 

    public void setRowKey(String rowKey) { 
     this.rowKey = rowKey; 
    } 

    public String getColumn1() { 
     return column1; 
    } 

    public void setColumn1(String column1) { 
     this.column1 = column1; 
    } 
} 

如果你需要一些理由使用HiveContext從蜂巢讀取和使用saveAsTable持久化數據使用蜂巢。 如有疑問,請告知我。

+0

謝謝!我會嘗試這看起來會幫助。 – BadBoy777