3

我需要編寫一個讀取DataSet [Row]並將其轉換爲DataSet [CustomClass] 的作業,其中CustomClass是一個protobuf類。如何將行映射到protobuf生成的類?

val protoEncoder = Encoders.bean(classOf[CustomClass]) 
val transformedRows = rows.map { 
    case Row(f1: String, f2: Long) => { 
    val pbufClass = CustomClass.newBuilder() 
          .setF1(f1) 
          .setF2(f2) 
    pbufClass.build()}}(protoEncoder) 

然而,貌似的Protobuf類不是真正的Java Bean和我得到以下

val x = Encoders.bean(classOf[CustomClass]) 

一個NPE一個人如何去確保作業可以發出 類型的數據集DataSet [CustomClass]其中CustomClass是protobuf類。 任何指針/關於爲該類編寫自定義編碼器的示例?

NPE:

val encoder2 = Encoders.bean(classOf[CustomClass]) 
java.lang.NullPointerException 
    at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465) 
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126) 
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125) 
    at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55) 
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89) 
    at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142) 
    ... 48 elided 

豆編碼器內部使用

JavaTypeInference.serializerFor(protoClass) 

如果我試圖做同樣在我的自定義編碼器,我得到一個更具描述性的錯誤消息:

Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant 
     at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430) 
     at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337) 
     at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69) 
     at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82) 
     at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82) 
     at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84) 
     at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81) 
     at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala) 
+0

你可以粘貼NPE到你的問題? –

+0

添加堆棧跟蹤,很確定此時發生這種情況是因爲Protobuf類不是有效的Java bean – Apurva

+0

@JacekLaskowski:更新了堆棧跟蹤(使用Encoders.bean以及在自定義編碼器中使用類似的代碼) - 這有幫助嗎? – Apurva

回答

2

對於將Row轉換爲Protobuf類,您可以使用sparksql-protobuf

該庫提供的實用程序可與SparkSql的 Protobuf對象一起使用。它提供了一種方法來讀取SparkSQL 寫成的parquet文件作爲兼容protobuf對象的RDD。它也可以將 將protobuf對象的RDD轉換爲DataFrame。

添加一個依賴於你的build.sbt文件

resolvers += Resolver.jcenterRepo 

libraryDependencies ++= Seq(
    "com.github.saurfang" %% "sparksql-protobuf" % "0.1.2", 
    "org.apache.parquet" % "parquet-protobuf" % "1.8.1" 

您可以按照一些例子從圖書館上手

Example 1

Example 2

我希望這有助於!

+0

謝謝,我看看這個, 「它提供了一種方法來讀取由SparkSQL寫成的parquet文件作爲兼容protobuf對象的RDD」 - 這個假設對我來說不一定是真實的 - 底層表示不在實木複合地板上。 – Apurva

+0

我還沒有與火花和protobuf工作,但這應該幫助你 –

+0

更多的上下文,我試着寫我自己的編碼器。 VAL串行= JavaTypeInference.serializerFor(protoClass) 這是因爲人的懷疑失敗:java.lang.UnsupportedOperationException: 致無法推斷類xxx.yyy.CustomClass類型,因爲它是不豆兼容 在組織.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ spark $ sql $ catalyst $ JavaTypeInference $$ serializerFor(JavaTypeInference.scala:430) – Apurva

1

My experience with Encoders are not very promising在這一點上,我會建議不要花更多的時間在這個。

我寧願去想方案,以及如何與星火工作的方式和火花計算的結果在最後一步映射到的protobuf生成的類。

+1

謝謝@JacekLaskowski您的指南在一般情況下非常有用。我希望能夠使用映射操作以分佈式方式將它們寫入關鍵值存儲區。到目前爲止,我的嘗試看起來與你提到的非常相似,但沒有運氣。將發佈更新,如果我得到這個工作。 – Apurva

0

雖然不是一個嚴格的答案,但我確實得到了一個解決方法。如果我們使用RDD,則不需要編碼器。

val rows = 
     spark.sql("select * from tablename").as[CaseClass].rdd 
val transformedRows = rows.map { 
    case Row(f1: String, f2: Long) => { 
    val pbufClass = CustomClass.newBuilder() 
          .setF1(f1) 
          .setF2(f2) 
    pbufClass.build()}} 

這給了我可以使用的Protobuf類的RDD。

+0

爲什麼不直接用sparksql-protobuf(saurfang的github)構建RDD [Proto]? – belka

0

我這樣做的方式:我使用saurfang的sparksql-protobuf庫(Github上提供的代碼)。您直接獲得RDD [ProtoSchema],但難以轉換爲數據集[ProtoSchema]。我用它來獲取信息,主要用戶定義函數附加到另一個RDD。

1:導入庫

與Maven:

<dependencies> 
    <dependency> 
     <groupId>com.github.saurfang</groupId> 
     <artifactId>sparksql-protobuf_2.10</artifactId> 
     <version>0.1.2</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.parquet</groupId> 
     <artifactId>parquet-protobuf</artifactId> 
     <version>1.9.0</version> 
    </dependency> 

    <dependency> 
     <groupId>com.google.protobuf</groupId> 
     <artifactId>protobuf-java</artifactId> 
     <version>3.5.1</version> 
    </dependency> 
</dependencies> 
... 

<repositories> 
    <repository> 
     <snapshots> 
      <enabled>false</enabled> 
     </snapshots> 
     <id>bintray-saurfang-maven</id> 
     <name>bintray</name> 
     <url>https://dl.bintray.com/saurfang/maven</url> 
    </repository> 
</repositories> 

2:讀數據作爲RDD [ProtoSchema]

val sess: SparkSession = ... 
val proto_rdd = new ProtoParquetRDD[ProtoSchema](sess.sparkContext, input_path, classOf[ProtoSchema]) 

(可選)添加PathFilter(Hadoop API)

如果你想添加一個PathFilter類(喜歡你用來使用Hadoop),或激活您曾與Hadoop的其他選項,你可以這樣做:

sess.sparkContext.hadoopConfiguration.setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true) 
sess.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[MyPathFiltering], classOf[PathFilter]) 

但是不要忘了清除您的Hadoop配置,如果你想用你的SparkSession來閱讀其他東西:

sess.sparkContext.hadoopConfiguration.clear() 
相關問題