2017-05-31 78 views

回答

4

你必須使用正確的反序列化器來解碼字節,比如字符串或你的自定義對象。

如果你不做解碼,你得到[[email protected],它只是Java中字節數組的文本表示。

Kafka對消息的內容一無所知,所以它將字節數組從生產者傳遞給消費者。

火花流,你必須使用鍵和值(報價KafkaWordCount example)序列:

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer") 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer") 

通過上述串行你DStream[String]所以你RDD[String]工作。

如果你想直接將字節數組反序列化到你的自定義類,你必須編寫一個自定義的Serializer(這是Kafka特有的,與Spark無關)。

我推薦的是使用帶有固定模式或Avro的JSON(使用Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages中描述的解決方案)。


Structured Streaming但是管道可以看看如下:

val fromKafka = spark. 
    readStream. 
    format("kafka"). 
    option("subscribe", "topic1"). 
    option("kafka.bootstrap.servers", "localhost:9092"). 
    load. 
    select('value cast "string") // <-- conversion here 
+0

那麼,如何Avro的卡夫卡消息轉換有/無模式的註冊表原始對象星火結構化數據流? –

+0

您必須知道原始對象並使用'map'運算符。沒有'from_avro',但是(如果有的話),因爲我們有'from_json'的JSON。 –

+0

我使用KafkaAvroDeserializer將Array [Byte]映射到我的Avro對象,但它說「無法找到存儲在數據集中的類型的編碼器」。然後我提供編碼器作爲隱含的def toEncoded(o:Zhima):Array [Byte] = o.toByteBuffer.array() fromEncoded(e:Array [Byte])的隱式def:Zhima = valueDeserializer.deserialize(kafkaConsumeTopicName,e)。 asInstanceOf [芝麻] 但它壓縮了同樣的錯誤,那麼如何解決呢? –