我需要編寫一個讀取DataSet [Row]並將其轉換爲DataSet [CustomClass] 的作業,其中CustomClass是一個protobuf類。如何將行映射到protobuf生成的類?
val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
case Row(f1: String, f2: Long) => {
val pbufClass = CustomClass.newBuilder()
.setF1(f1)
.setF2(f2)
pbufClass.build()}}(protoEncoder)
然而,貌似的Protobuf類不是真正的Java Bean和我得到以下
val x = Encoders.bean(classOf[CustomClass])
一個NPE一個人如何去確保作業可以發出 類型的數據集DataSet [CustomClass]其中CustomClass是protobuf類。 任何指針/關於爲該類編寫自定義編碼器的示例?
NPE:
val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
... 48 elided
豆編碼器內部使用
JavaTypeInference.serializerFor(protoClass)
如果我試圖做同樣在我的自定義編碼器,我得到一個更具描述性的錯誤消息:
Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)
你可以粘貼NPE到你的問題? –
添加堆棧跟蹤,很確定此時發生這種情況是因爲Protobuf類不是有效的Java bean – Apurva
@JacekLaskowski:更新了堆棧跟蹤(使用Encoders.bean以及在自定義編碼器中使用類似的代碼) - 這有幫助嗎? – Apurva