2
我有一個流的數據集,從卡夫卡閱讀並試圖寫入CSV如何動態定義流數據集的模式以寫入csv?
case class Event(map: Map[String,String])
def decodeEvent(arrByte: Array[Byte]): Event = ...//some implementation
val eventDataset: Dataset[Event] = spark
.readStream
.format("kafka")
.load()
.select("value")
.as[Array[Byte]]
.map(decodeEvent)
Event
持有Map[String,String]
內寫入CSV我需要一些架構。
比方說,所有的字段都String
型的,所以我試圖從spark repo
val columns = List("year","month","date","topic","field1","field2")
val schema = new StructType() //Prepare schema programmatically
columns.foreach { field => schema.add(field, "string") }
val rowRdd = eventDataset.rdd.map { event => Row.fromSeq(
columns.map(c => event.getOrElse(c, "")
)}
val df = spark.sqlContext.createDataFrame(rowRdd, schema)
的例子這使運行時錯誤上線「eventDataset.rdd」:
產生的原因: org.apache.spark.sql.AnalysisException:與 查詢必須使用writeStream.start();;執行。
下面不起作用,因爲「.MAP」有一個列表[字符串]沒有元組
eventDataset.map(event => columns.map(c => event.getOrElse(c,""))
.toDF(columns:_*)
有沒有辦法與程序的架構和結構化數據集流來實現這一目標?