2016-01-22 107 views
1

我有兩張需要使用scala連接的HBase表格。這些表格使用sqoop從Oracle導入,並可用於查詢Hue數據瀏覽器如何在Spark中使用Scala連接兩個Hbase表格

使用Spark 1.5,Scala 2.10.4。

我使用的是從這裏HBase的數據連接器:https://github.com/nerdammer/spark-hbase-connector

import it.nerdammer.spark.hbase._ 
import org.apache.hadoop.hbase.client.{ HBaseAdmin, Result } 
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor } 
import org.apache.hadoop.hbase.mapreduce.TableInputFormat 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable 
import org.apache.spark._ 
import it.nerdammer.spark.hbase.conversion.{ FieldReader, FieldWriter } 
import org.apache.hadoop.hbase.util.Bytes 

case class Artist(id: String, 
       name: String, 
       age: Int); 

case class Cd(id: String, 
       artistId: String, 
       title: String, 
       year: Int); 

case class ArtistCd(id: String, 
        name: String, 
        title: String, 
        year: Int); 

implicit def artistReader: FieldReader[Artist] = new FieldReader[Artist] { 

    override def map(data: HBaseData): Artist = Artist(

     id = Bytes.toString(data.head.get), 
     name = Bytes.toString(data.drop(1).head.get), 
     age = Bytes.toInt(data.drop(2).head.get)); 

    override def columns = Seq("NAME", "AGE"); 

}; 

implicit def cdReader: FieldReader[Cd] = new FieldReader[Cd] { 

    override def map(data: HBaseData): Cd = Cd(

     id = Bytes.toString(data.head.get), 
     artistId = Bytes.toString(data.drop(1).head.get), 
     title = Bytes.toString(data.drop(2).head.get), 
     year = Bytes.toInt(data.drop(3).head.get)); 

    override def columns = Seq("ARTIST_ID", "TITLE", "YEAR"); 

}; 

implicit def artistCdWriter: FieldWriter[ArtistCd] = new FieldWriter[ArtistCd] { 
    override def map(data: ArtistCd): HBaseData = 
     Seq(
      Some(Bytes.toBytes(data.id)), 
      Some(Bytes.toBytes(data.name)), 
      Some(Bytes.toBytes(data.title)), 
      Some(Bytes.toBytes(data.year))); 

    override def columns = Seq("NAME", "TITLE", "YEAR"); 
}; 

val conf = new SparkConf().setAppName("HBase Join").setMaster("spark://localhost:7337") 
val sc = new SparkContext(conf) 

val artistRDD = sc.hbaseTable[Artist]("ARTISTS").inColumnFamily("cf") 
val cdRDD = sc.hbaseTable[Cd]("CDS").inColumnFamily("cf") 

val artistById = artistRDD.keyBy(f => f.id) 
val cdById = cdRDD.keyBy(f => f.artistId) 

val artistcd = artistById.join(cdById) 

val artistCdRDD = artistcd.map(f => new ArtistCd(f._2._1.id, f._2._2.title, f._2._1.name, f._2._2.year)) 
artistCdRDD.toHBaseTable("ARTIST_CD").inColumnFamily("cf").save() 
System.exit(1) 

當我運行此我得到以下異常

16/01/22 14:27:04 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 3, localhost): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 2068 actions: ARTIST_CD: 2068 times, 
     at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:227) 
     at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$1700(AsyncProcess.java:207) 
     at org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1663) 
     at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:208) 
     at org.apache.hadoop.hbase.client.BufferedMutatorImpl.doMutate(BufferedMutatorImpl.java:141) 
     at org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:98) 
     at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:129) 
     at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1036) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034) 
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1042) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1014) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
     at org.apache.spark.scheduler.Task.run(Task.scala:88) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

如果有人在這方面的經驗,我會真的很感謝你的幫助

我在這裏看到了兩種解決方案How to Join two tables in Hbasehow to join tables in hbase,但不幸的是,我

回答

0

想通了 - 首先新表需要已經存在。 我曾經想過save()命令會創建它,但是沒有。 此外,新表必須具有您要保存的列族 - 此處爲「cf」