2016-10-22 68 views
2

我正在嘗試使用scala(以前沒有使用過)將SparkRDD寫入HBase表格。整個代碼是這樣的:使用Scala將SparkRDD寫入HBase表格

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result} 
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} 
import org.apache.hadoop.hbase.mapreduce.TableInputFormat 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
import scala.collection.JavaConverters._ 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.spark._ 
import org.apache.hadoop.mapred.JobConf 
import org.apache.spark.rdd.PairRDDFunctions 
import org.apache.spark.SparkContext._ 
import org.apache.hadoop.mapred.Partitioner; 
import org.apache.hadoop.hbase.mapred.TableOutputFormat 
import org.apache.hadoop.hbase.client._ 

object HBaseWrite { 
    def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("HBaseWrite").setMaster("local").set("spark.driver.allowMultipleContexts","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    val sc = new SparkContext(sparkConf) 
    val conf = HBaseConfiguration.create() 
    val outputTable = "tablename" 

    System.setProperty("user.name", "hdfs") 
    System.setProperty("HADOOP_USER_NAME", "hdfs") 
    conf.set("hbase.master", "localhost:60000") 
    conf.setInt("timeout", 120000) 
    conf.set("hbase.zookeeper.quorum", "localhost") 
    conf.set("zookeeper.znode.parent", "/hbase-unsecure") 
    conf.setInt("hbase.client.scanner.caching", 10000) 
    sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result])) 
    val jobConfig: JobConf = new JobConf(conf,this.getClass) 
    jobConfig.setOutputFormat(classOf[TableOutputFormat]) 
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE,outputTable) 
    val x = 12 
    val y = 15 
    val z = 25 
    var newarray = Array(x,y,z) 
    val newrddtohbase = sc.parallelize(newarray) 
    def convert(a:Int) : Tuple2[ImmutableBytesWritable,Put] = { 
      val p = new Put(Bytes.toBytes(a)) 
      p.add(Bytes.toBytes("columnfamily"), 
      Bytes.toBytes("col_1"), Bytes.toBytes(a)) 
      new Tuple2[ImmutableBytesWritable,Put](new ImmutableBytesWritable(a.toString.getBytes()), p); 
    } 
    new PairRDDFunctions(newrddtohbase.map(convert)).saveAsHadoopDataset(jobConfig) 
    sc.stop() 
    } 
} 

這樣做後,我得到的錯誤HBaseWrite(主(陣列())是這樣的:?

org.apache.spark.SparkException: Task not serializable 

我該如何繼續完成它

+0

將'convert'方法作爲函數literal傳遞給map方法,解決了這個問題。 – Shankar

回答

0

例如,下面的方法需要爲智力參數和返回雙

var toDouble: (Int) => Double = a => { 
    a.toDouble 
} 

您可以使用toDouble(2)並返回2.0

,你可以把你方法如下功能的文字一樣。

val convert: (Int) => Tuple2[ImmutableBytesWritable,Put] = a => { 
       val p = new Put(Bytes.toBytes(a)) 
       p.add(Bytes.toBytes("columnfamily"), 
       Bytes.toBytes("col_1"), Bytes.toBytes(a)) 
       new Tuple2[ImmutableBytesWritable,Put](new ImmutableBytesWritable(a.toString.getBytes()), p); 
     } 
+0

向下選民,來吧...添加一些評論.. – Shankar

+0

我不知道爲什麼有人downvote這一點。它已經爲我刪除了錯誤。 –

2

你在這裏做錯了什麼是定義convert裏面main 如果你用這種方式寫這段代碼,它可能會工作:

object HBaseWrite { 
     def main(args: Array[String]) { 
     val sparkConf = new SparkConf().setAppName("HBaseWrite").setMaster("local").set("spark.driver.allowMultipleContexts","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
     val sc = new SparkContext(sparkConf) 
     val conf = HBaseConfiguration.create() 
     val outputTable = "tablename" 

     System.setProperty("user.name", "hdfs") 
     System.setProperty("HADOOP_USER_NAME", "hdfs") 
     conf.set("hbase.master", "localhost:60000") 
     conf.setInt("timeout", 120000) 
     conf.set("hbase.zookeeper.quorum", "localhost") 
     conf.set("zookeeper.znode.parent", "/hbase-unsecure") 
     conf.setInt("hbase.client.scanner.caching", 10000) 
     sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result])) 
     val jobConfig: JobConf = new JobConf(conf,this.getClass) 
     jobConfig.setOutputFormat(classOf[TableOutputFormat]) 
     jobConfig.set(TableOutputFormat.OUTPUT_TABLE,outputTable) 
     val x = 12 
     val y = 15 
     val z = 25 
     var newarray = Array(x,y,z) 
     val newrddtohbase = sc.parallelize(newarray) 
     val convertFunc = convert _ 
     new PairRDDFunctions(newrddtohbase.map(convertFunc)).saveAsHadoopDataset(jobConfig) 
     sc.stop() 
     } 
     def convert(a:Int) : Tuple2[ImmutableBytesWritable,Put] = { 
       val p = new Put(Bytes.toBytes(a)) 
       p.add(Bytes.toBytes("columnfamily"), 
       Bytes.toBytes("col_1"), Bytes.toBytes(a)) 
       new Tuple2[ImmutableBytesWritable,Put](new ImmutableBytesWritable(a.toString.getBytes()), p); 
     } 
    } 

P.S .:代碼沒有測試,但它應該工作!

+0

感謝您的迴應,但錯誤仍然相同。 –

+0

能否請您粘貼錯誤Stackk太 –

+0

org.apache.spark.SparkException:任務不可序列 \t在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:166) \t在org.apache。 spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:158) \t at org.apache.spark.SparkContext.clean(SparkContext.scala:1446) \t at org.apache.spark.rdd.RDD.map(RDD .scala:286) –