0

我想寫一個DataFrame從Spark到Kafka,我找不到任何解決方案。你能告訴我怎麼做嗎?如何編寫DataFrame(從foreach中的RDD構建)到Kafka?

這裏是我當前的代碼:

activityStream.foreachRDD { rdd => 
    val activityDF = rdd 
    .toDF() 
    .selectExpr(
     "timestamp_hour", "referrer", "action", 
     "prevPage", "page", "visitor", "product", "inputProps.topic as topic") 
    val producerRecord = new ProducerRecord(topicc, activityDF) 

    kafkaProducer.send(producerRecord) // <--- this shows an error 
    } 

類型不匹配;發現:org.apache.kafka.clients.producer.ProducerRecord [Nothing,org .apache.spark.sql.Da taFrame](展開後)org.apache.kafka.clients.producer.ProducerRecord [Nothing,org .apache.spark.sql.Da taset [org.apache.spa rk.sql.Row]]必需:org.apache.kafka.clients.producer.ProducerRecord [Nothing,Str ing]在涉及的應用程序中發生錯誤默認參數。

+1

類型不匹配;發現:org.apache.kafka.clients.producer.ProducerRecord [Nothing,org.apache.spark.sql.DataFrame](擴展爲)org.apache.kafka.clients.producer.ProducerRecord [Nothing,org.apache.spark .sql.Dataset [org.apache.spark.sql.Row]]必需:org.apache.kafka.clients.producer.ProducerRecord [Nothing,String]涉及默認參數的應用程序中發生錯誤。 –

+0

你可以添加/粘貼編譯器錯誤到你的問題? –

回答

0

activityDF上做collect得到記錄(不是Dataset[Row])並將它們保存到Kafka。

請注意,您最終將在collect之後收集一系列記錄,因此您可能必須對它進行迭代,例如,上

val activities = activityDF.collect() 
// the following is pure Scala and has nothing to do with Spark 
activities.foreach { a: Row => 
    val pr: ProducerRecord = // map a to pr 
    kafkaProducer.send(pr) 
} 

使用模式匹配來解構它到字段/列,例如

activities.foreach { case Row(timestamp_hour, referrer, action, prevPage, page, visitor, product, topic) => 
    // ...transform a to ProducerRecord 
    kafkaProducer.send(pr) 
} 

普羅蒂普:我強烈建議使用case class和變換DataFrame(= Dataset[Row])至Dataset[YourCaseClass]

請參閱Spark SQL的Row和Kafka的ProducerRecord文檔。


由於Joe Nate在評論中指出:

如果你寫任何端點之前「收」,這將讓所有的數據彙總在驅動程序,然後讓司機寫出來。 1)如果數據太多,可能會導致驅動程序崩潰(2)寫入沒有並行性。

這100%正確。我希望我說過:) :)

您可能想要使用Writing Stream Output to Kafka中所述的方法。

+2

如果您在寫入任何端點之前進行「收集」操作,將會在驅動程序中彙總所有數據,然後讓驅動程序將其寫出。 1)如果數據太多,可能會導致驅動程序崩潰(2)寫入沒有並行性。 –

+0

更好的解決方案在這裏:https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/09%20Write%20Output%20To%20Kafka.html –