我想寫一個DataFrame
從Spark到Kafka,我找不到任何解決方案。你能告訴我怎麼做嗎?如何編寫DataFrame(從foreach中的RDD構建)到Kafka?
這裏是我當前的代碼:
activityStream.foreachRDD { rdd =>
val activityDF = rdd
.toDF()
.selectExpr(
"timestamp_hour", "referrer", "action",
"prevPage", "page", "visitor", "product", "inputProps.topic as topic")
val producerRecord = new ProducerRecord(topicc, activityDF)
kafkaProducer.send(producerRecord) // <--- this shows an error
}
類型不匹配;發現:org.apache.kafka.clients.producer.ProducerRecord [Nothing,org .apache.spark.sql.Da taFrame](展開後)org.apache.kafka.clients.producer.ProducerRecord [Nothing,org .apache.spark.sql.Da taset [org.apache.spa rk.sql.Row]]必需:org.apache.kafka.clients.producer.ProducerRecord [Nothing,Str ing]在涉及的應用程序中發生錯誤默認參數。
類型不匹配;發現:org.apache.kafka.clients.producer.ProducerRecord [Nothing,org.apache.spark.sql.DataFrame](擴展爲)org.apache.kafka.clients.producer.ProducerRecord [Nothing,org.apache.spark .sql.Dataset [org.apache.spark.sql.Row]]必需:org.apache.kafka.clients.producer.ProducerRecord [Nothing,String]涉及默認參數的應用程序中發生錯誤。 –
你可以添加/粘貼編譯器錯誤到你的問題? –