2017-10-19 78 views
1

我是新來的火花。我使用結構化流式傳輸從kafka讀取數據。Spark(2.2):deserialise使用結構化流式處理來自卡夫卡的節儉記錄

我可以在Scala中使用此代碼讀取數據:

val data = spark.readStream 
     .format("kafka") 
     .option("kafka.bootstrap.servers", brokers) 
     .option("subscribe", topics) 
     .option("startingOffsets", startingOffsets) 
     .load() 

我在值列數據是節儉的記錄。 Streaming API以二進制格式提供數據。我看到將數據轉換爲字符串或json的示例,但我無法找到任何有關如何將數據反序列化到Thrift的示例。

我該如何做到這一點?

回答

0

我在databricks的網站上發現了這個博客。它展示瞭如何利用Spark SQL的API來消費和轉換來自Apache Kafka的複雜數據流。

https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

有解釋UDF如何被用來解串器行的部分:

object MyDeserializerWrapper { 
    val deser = new MyDeserializer 
} 
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) => 
    MyDeserializerWrapper.deser.deserialize(topic, bytes) 
) 

df.selectExpr("""deserialize("topic1", value) AS message""") 

我用java,所以只好寫下面的示例UDF,以檢查它怎麼能在Java中被稱爲:

UDF1<byte[], String> mode = new UDF1<byte[], String>() { 
      @Override 
      public String call(byte[] bytes) throws Exception { 
       String s = new String(bytes); 
       return "_" + s; 
      } 
     }; 

現在我可以使用這個UDF在結構化流字數舉例如下:

Dataset<String> words = df 
       //converted the DataFrame to a Dataset of String using .as(Encoders.STRING()) 
//    .selectExpr("CAST(value AS STRING)") 
       .select(callUDF("mode", col("value"))) 
       .as(Encoders.STRING()) 
       .flatMap(
         new FlatMapFunction<String, String>() { 
          @Override 
          public Iterator<String> call(String x) { 
           return Arrays.asList(x.split(" ")).iterator(); 
          } 
         }, Encoders.STRING()); 

對我來說,下一步是爲節儉反序列化寫一個UDF。我會盡快發佈。

相關問題