0

嘗試將數據插入到Hbase時遇到問題。我運行在谷歌雲星火殼的Scala代碼,並試圖從RDD數據插入到HBase的(BigTable的)hbaseRDD的從RDD插入數據時出現Hbase序列化錯誤

格式: - RDD [(字符串,地圖[字符串,字符串])]

字符串是行ID,地圖包含它的相應列和它的值。

代碼是這樣的: -

val tableName: String = "omniture"; 

val connection = BigtableConfiguration.connect("*******", "**********") 
val admin = connection.getAdmin(); 
val table = connection.getTable(TableName.valueOf(tableName)); 

TRY 1 : 
    hbaseRDD.foreach{w => 

     val put = new Put(Bytes.toBytes(w._1)); 
     var ColumnValue = w._2 

     ColumnValue.foreach{x =>  


     put.addColumn(Bytes.toBytes("u"), Bytes.toBytes(x._1), Bytes.toBytes(x._2)); 

          } 
     table.put(put); 

     }  

TRY 2 : 
     hbaseRDD.map{w => 

     val put = new Put(Bytes.toBytes(w._1)); 
     var ColumnValue = w._2 

     ColumnValue.map{x =>  

     put.addColumn(Bytes.toBytes("u"), Bytes.toBytes(x._1), Bytes.toBytes(x._2)); 

          } 
     table.put(put); 

     } 

貝婁是我收到的錯誤: -

org.apache.spark.SparkException: Task not serializable 
Caused by: java.io.NotSerializableException: com.google.cloud.bigtable.hbase.BigtableTable 
Serialization stack: 
     - object not serializable (class: com.google.cloud.bigtable.hbase.BigtableTable, value: BigtableTable{hashCode=0x7d96618, project=cdp-dev-201706-01, instance=cdp-dev-cl-hbase-instance, table=omniture, host=bigtable.googleapis.com}) 
     - field (class: logic.ingestion.Ingestion$$anonfun$insertTransactionData$1, name: table$1, type: interface org.apache.hadoop.hbase.client.Table) 
     - object (class logic.ingestion.Ingestion$$anonfun$insertTransactionData$1, <function1>) 
     at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 
     ... 27 more 

任何幫助,將不勝感激。提前致謝。

回答

0

與參考: - Writing to HBase via Spark: Task not serializable

貝婁是做正確的方法: -

hbaseRDD.foreachPartition {w => 

      val tableName: String = "omniture"; 

      val connection = BigtableConfiguration.connect("cdp-dev-201706-01", "cdp-dev-cl-hbase-instance") 
      val admin = connection.getAdmin(); 

      val table = connection.getTable(TableName.valueOf(tableName)); 

      w.foreach {f=> 

      var put = new Put(Bytes.toBytes(f._1)) 

      var ColumnValue = f._2 
       ColumnValue.foreach{x =>  
         put.addColumn(Bytes.toBytes("u"), Bytes.toBytes(x._1), Bytes.toBytes(x._2)); 
           } 
      table.put(put); 
      } 

     }  

     hbaseRDD.collect(); 

詳情以及在上面的鏈接解釋

相關問題