2016-08-13 31 views
1

我想從Spark加載大量的數據到HBase。我正在使用saveAsNewAPIHadoopDataset方法。問題與對象不可序列化類:org.apache.hadoop.hbase.io.ImmutableBytesWritable錯誤

我正在創建ImmutableWritable並放置並保存,如下所示。

dataframe.mapPartitions { rows => 
     { 
     rows.map { eachRow => 
      { 
      val rowKey = Seq(eachRow.getAs[String]("uniqueId"), eachRow.getAs[String]("authTime")).mkString(",") 
      val put = new Put(Bytes.toBytes(rowKey)); 
      val fields = eachRow.schema.fields; 

      for (i <- 0 until fields.length) { 
       put.addColumn(userCF, Bytes.toBytes(fields(i).name), Bytes.toBytes(String.valueOf(eachRow.get(i)))) 
      } 

      (new ImmutableBytesWritable(Bytes.toBytes(rowKey)), put) 
      } 
     } 
     } 
    }.saveAsNewAPIHadoopDataset(job.getConfiguration) 

我的數據是30GB的價值,它存在於60個文件的HDFS中。

當我一次提交10個文件的同一份工作時,每件事情都很順利。

但是,當我一次提交所有的東西,它是給這個錯誤。這個錯誤真的令人沮喪,我嘗試了所有可能的事情。但真正想知道是什麼讓數據在5GB時成功運行,以及是什麼導致30GB數據時出錯。

有沒有人遇到過這樣的問題。

+0

你可以把完整的堆棧跟蹤在這裏......? –

+0

當我爲每個執行程序分配40G內存時,工作正在接近,但是每當我分配的內存更少時,我都面臨着這個錯誤。這意味着,每當有一個洗牌,我得到這個錯誤。 – Srini

回答

0

這是因爲ImmutableBytesWritable不可序列化。當有洗牌時,apache spark會嘗試序列化它以發送到另一個節點。如果你想嘗試拿一些或收集在司機身上,也會發生同樣的情況。

實際上只有兩種方法。

  • 不要在洗牌時使用它。如果你只是需要把每個記錄從磁盤放入數據庫,那麼看起來像洗牌是不需要的。確保它是。如果您需要在數據進入數據庫之前對其進行預處理,請將其保存爲其他可序列化格式,並在保存時將其轉換爲僅需的數據。
  • 使用另一個序列化程序。 Apache Spark帶有Kryo(確保你使用的是spark 2.0.0-- Kryo已經在那裏更新了,它修復了一些令人討厭的併發錯誤)。爲了使用它,你必須配置它。這並不難,但需要一些代碼。
+0

嗨evgeni。是的,我嘗試了kryo,並通過在類的列表中提供它來簡化該類的可序列化。但克里給了我其他的序列化問題。精確地說,索引出界問題。所以我別無選擇,只能放棄它..我希望新的API能夠更好地使用kryo API。 – Srini

+0

@Srini,你用什麼火花版本?如果在2.0.0之前,那麼Kryo遇到了問題,您無法解決問題。可能就是這樣。 – evgenii