2015-04-14 50 views
1

我正在用spark和hbase揮霍。我用HBaseTest.scalahBaseRDD.collect()給出了一個錯誤

rdd.count()給出了準確的結果。但是當我嘗試執行rdd.collect()時出現以下錯誤:

java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

無法找出問題。我想打印一些hbase表的行。

回答

1

我有同樣的問題,發現解決方案here

下面的代碼片段

type HBaseRow = java.util.NavigableMap[Array[Byte], java.util.NavigableMap[Array[Byte], java.util.NavigableMap[java.lang.Long, Array[Byte]]]] 
type CFTimeseriesRow = Map[Array[Byte], Map[Array[Byte], Map[Long, Array[Byte]]]] 
type CFTimeseriesRowStr = scala.collection.immutable.Map[String, scala.collection.immutable.Map[String, scala.collection.immutable.Map[Long, String]]] 
import scala.collection.JavaConverters._ 
def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr = navMap.map(cf => 
    (Bytes.toString(cf._1), cf._2.map(col => 
    (Bytes.toString(col._1), col._2.map(elem => (elem._1, Bytes.toString(elem._2))))))) 
def navMapToMap(navMap: HBaseRow): CFTimeseriesRow = 
    navMap.asScala.toMap.map(cf => 
    (cf._1, cf._2.asScala.toMap.map(col => 
     (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2)))))) 
@transient val conf = HBaseConfiguration.create() 
conf.set(TableInputFormat.INPUT_TABLE, tableName) 
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
    classOf[org.apache.hadoop.hbase.client.Result]) 
    .map(kv => (kv._1.get(), navMapToMap(kv._2.getMap))) 
    .map(kv => (Bytes.toString(kv._1), rowToStrMap(kv._2))).take(10).foreach(println)