2016-11-16 71 views
0

我正在使用結構化流式傳輸(Spark 2.0.2)來使用kafka消息。在protobuf中使用scalapb,消息。我收到以下錯誤。請幫助..在螺紋使用Spark 2.0.2構建的流式傳輸,Kafka源代碼和scalapb

異常 「主要」 scala.ScalaReflectionException:是 不 scala.reflect.api.Symbols $ SymbolApi $ class.asTerm(Symbols.scala:199)的一個術語 在 scala.reflect.internal.Symbols $ SymbolContextApiImpl.asTerm(Symbols.scala:84) 在 org.apache.spark.sql.catalyst.ScalaReflection $ class.constructParams(ScalaReflection.scala:811) 在 org.apache。 spark.sql.catalyst.ScalaReflection $ .constructParams(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection $ class.getConstructor參數(ScalaReflection.scala:800) 在 org.apache.spark.sql.catalyst.ScalaReflection $ .getConstructorParameters(ScalaReflection.scala:39) 在 org.apache.spark.sql.catalyst.ScalaReflection $ $ .ORG apache $ spark $ sql $ catalyst $ ScalaReflection $$ serializerFor(ScalaReflection.scala:582) at org.apache.spark.sql.catalyst.ScalaReflection $ .org $ apache $ spark $ sql $ catalyst $ ScalaReflection $$ serializerFor( ScalaReflection.scala:460) 在 org.apache.spark.sql.catalyst.ScalaReflection $$ anonfun $ 9.apply(ScalaReflection.scala:592) 在 org.apache.spark.sql.catalyst.ScalaReflection $$ anonfun $ 9.apply(ScalaReflection.scala:583) at scala。 collection.TraversableLike $$ anonfun $ flatMap $ 1.apply(TraversableLike.scala:252) at scala.collection.TraversableLike $$ anonfun $ flatMap $ 1.apply(TraversableLike.scala:252) at scala.collection.immutable.List .foreach(List.scala:381)at scala.collection.TraversableLike $ class.flatMap(TraversableLike.scala:252) at scala.collection.immutable.List.flatMap(List.scala:344)at org.apache .spark.sql.catalyst.ScalaReflection $ .org $ apache $ spark $ sql $ catalyst $ ScalaReflection $$ serializerFor(ScalaReflection.scala:583) at org.apache.spark.sql.catalyst.ScalaReflection $ .serializerFor(ScalaReflection .scala:425) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder $ .apply(ExpressionEncoder .scala:61) at org.apache.spark.sql.Encoders $ .product(Encoders.scala:274)at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47) at PersonConsumer $ 。主要(PersonConsumer.scala:33)在在 sun.reflect.NativeMethodAccessorImpl.invoke0(本機方法) PersonConsumer.main(PersonConsumer.scala)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498)at com.intellij.rt.execution.application.AppMain.main(AppMain .java:147)

以下是我的代碼...

object PersonConsumer { 
    import org.apache.spark.rdd.RDD 
    import com.trueaccord.scalapb.spark._ 
    import org.apache.spark.sql.{SQLContext, SparkSession} 
    import com.example.protos.demo._ 

    def main(args : Array[String]) { 

    def parseLine(s: String): Person = 
     Person.parseFrom(
     org.apache.commons.codec.binary.Base64.decodeBase64(s)) 

    val spark = SparkSession.builder. 
     master("local") 
     .appName("spark session example") 
     .getOrCreate() 

    import spark.implicits._ 

    val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load() 

    val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String] 

    val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons") 

    val ds4 = spark.sqlContext.sql("select name from persons") 

    val query = ds4.writeStream 
     .outputMode("append") 
     .format("console") 
     .start() 
    query.awaitTermination() 
    } 
} 

回答

0

val ds3行應該是:

val ds3 = ds2.map(str => parseLine(str)) 

sqlContext.protoToDataFrame(ds3).registerTempTable("persons") 

的RDD之前需要被保存爲臨時錶轉換爲數據幀。

0

在人類中,性別是一個枚舉,這是造成這個問題的原因。刪除此字段後,它工作正常。 以下是我從DataBricks的Shixiong(Ryan)得到的答案。

問題是「可選性別性別= 3;」。生成的類「Gender」是一個特性,Spark不知道如何創建特性,因此不支持。您可以定義SQL編碼器支持的類,並將此生成的類轉換爲parseLine中的新類。

相關問題