2017-05-12 25 views
4

我使用Spark 2.1如何使用Java中的結構化流來反序列化Kafka中的記錄?

我想使用Spark Structured Streaming讀取來自Kafka的記錄,反序列化它們並在之後應用聚合。

我有以下代碼:

SparkSession spark = SparkSession 
      .builder() 
      .appName("Statistics") 
      .getOrCreate(); 

    Dataset<Row> df = spark 
      .readStream() 
      .format("kafka") 
      .option("kafka.bootstrap.servers", kafkaUri) 
      .option("subscribe", "Statistics") 
      .option("startingOffsets", "earliest") 
      .load(); 

    df.selectExpr("CAST(value AS STRING)") 

我想是到value領域反序列化到我的對象,而不是鑄塑String

我有一個自定義的解串器。

public StatisticsRecord deserialize(String s, byte[] bytes) 

如何在Java中執行此操作?


我發現的唯一相關鏈接是這個https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html,但這是用於Scala。

+0

你的消息是JSON格式嗎? – abaghel

+0

我可以通過自定義序列化器以JSON格式或序列化格式存儲數據。 – dchar

回答

2

爲您的JSON消息定義模式。

StructType schema = DataTypes.createStructType(new StructField[] { 
       DataTypes.createStructField("Id", DataTypes.IntegerType, false), 
       DataTypes.createStructField("Name", DataTypes.StringType, false), 
       DataTypes.createStructField("DOB", DataTypes.DateType, false) }); 

現在閱讀下面的消息。 MessageData是您的JSON消息的JavaBean。

Dataset<MessageData> df = spark 
      .readStream() 
      .format("kafka") 
      .option("kafka.bootstrap.servers", kafkaUri) 
      .option("subscribe", "Statistics") 
      .option("startingOffsets", "earliest") 
      .load() 
      .selectExpr("CAST(value AS STRING) as message") 
      .select(functions.from_json(functions.col("message"),schema).as("json")) 
      .select("json.*") 
      .as(Encoders.bean(MessageData.class)); 
+1

模式已正確應用,但我爲所有列獲得空值。我試圖讀取列作爲df.createOrReplaceTempView(「數據」); StreamingQuery query = spark.sql(「SELECT * FROM data」)。writeStream()。format(「console」)。start();難道我做錯了什麼? – dchar

+0

您可以直接像下面一樣閱讀數據集 df。 df.writeStream()格式( 「控制檯」)開始()。; – abaghel

+1

這產生了完全相同的結果。我在所有列中看到前20行都爲「null」。 – dchar

2

如果你有你的數據在Java中的自定義解串器,使用它,你從卡夫卡得到load後字節。

df.select("value") 

這條線給你Dataset<Row>只是一個單一的列value


我星火API斯卡拉完全是,所以我會做的斯卡拉以下處理「反序列化」案例:

import org.apache.spark.sql.Encoders 
implicit val statisticsRecordEncoder = Encoders.product[StatisticsRecord] 
val myDeserializerUDF = udf { bytes => deserialize("hello", bytes) } 
df.select(myDeserializerUDF($"value") as "value_des") 

這應該給你你想要的...在斯卡拉。將其轉換爲Java是您的家庭練習:)

請注意,您的自定義對象必須具有可用的編碼器,否則Spark SQL將拒絕將其對象放入數據集中。

相關問題