我從卡夫卡獲取數據,然後使用默認解碼器對Array[Byte]
進行反序列化,之後我的RDD元素看起來像(null,[[email protected])
,(null,[[email protected])
但我希望我的原始數據具有模式,那麼如何實現這一點?如何將字節從Kafka轉換爲原始對象?
我以Avro格式序列化消息。
我從卡夫卡獲取數據,然後使用默認解碼器對Array[Byte]
進行反序列化,之後我的RDD元素看起來像(null,[[email protected])
,(null,[[email protected])
但我希望我的原始數據具有模式,那麼如何實現這一點?如何將字節從Kafka轉換爲原始對象?
我以Avro格式序列化消息。
你必須使用正確的反序列化器來解碼字節,比如字符串或你的自定義對象。
如果你不做解碼,你得到[[email protected]
,它只是Java中字節數組的文本表示。
Kafka對消息的內容一無所知,所以它將字節數組從生產者傳遞給消費者。
火花流,你必須使用鍵和值(報價KafkaWordCount example)序列:
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
通過上述串行你DStream[String]
所以你RDD[String]
工作。
如果你想直接將字節數組反序列化到你的自定義類,你必須編寫一個自定義的Serializer(這是Kafka特有的,與Spark無關)。
我推薦的是使用帶有固定模式或Avro的JSON(使用Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages中描述的解決方案)。
在Structured Streaming但是管道可以看看如下:
val fromKafka = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
select('value cast "string") // <-- conversion here
那麼,如何Avro的卡夫卡消息轉換有/無模式的註冊表原始對象星火結構化數據流? –
您必須知道原始對象並使用'map'運算符。沒有'from_avro',但是(如果有的話),因爲我們有'from_json'的JSON。 –
我使用KafkaAvroDeserializer將Array [Byte]映射到我的Avro對象,但它說「無法找到存儲在數據集中的類型的編碼器」。然後我提供編碼器作爲隱含的def toEncoded(o:Zhima):Array [Byte] = o.toByteBuffer.array() fromEncoded(e:Array [Byte])的隱式def:Zhima = valueDeserializer.deserialize(kafkaConsumeTopicName,e)。 asInstanceOf [芝麻] 但它壓縮了同樣的錯誤,那麼如何解決呢? –