我使用Spark 2.1。如何使用Java中的結構化流來反序列化Kafka中的記錄?
我想使用Spark Structured Streaming讀取來自Kafka的記錄,反序列化它們並在之後應用聚合。
我有以下代碼:
SparkSession spark = SparkSession
.builder()
.appName("Statistics")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUri)
.option("subscribe", "Statistics")
.option("startingOffsets", "earliest")
.load();
df.selectExpr("CAST(value AS STRING)")
我想是到value
領域反序列化到我的對象,而不是鑄塑String
。
我有一個自定義的解串器。
public StatisticsRecord deserialize(String s, byte[] bytes)
如何在Java中執行此操作?
我發現的唯一相關鏈接是這個https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html,但這是用於Scala。
你的消息是JSON格式嗎? – abaghel
我可以通過自定義序列化器以JSON格式或序列化格式存儲數據。 – dchar