隨着數據幀稱爲lastTail
,我可以重複這樣的:星火斯卡拉據幀單列轉換成JSON用於PostrgeSQL插入
import scalikejdbc._
// ...
// Do Kafka Streaming to create DataFrame lastTail
// ...
lastTail.printSchema
lastTail.foreachPartition(iter => {
// open database connection from connection pool
// with scalikeJDBC (to PostgreSQL)
while(iter.hasNext) {
val item = iter.next()
println("****")
println(item.getClass)
println(item.getAs("fileGid"))
println("Schema: "+item.schema)
println("String: "+item.toString())
println("Seqnce: "+item.toSeq)
// convert this item into an XXX format (like JSON)
// write row to DB in the selected format
}
})
這個輸出「像」(與密文): root |-- fileGid: string (nullable = true) |-- eventStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true) |-- revisionStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true)
和(只有一個迭代項 - 編輯,但希望具有足夠好的語法)
**** class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 12345 Schema: StructType(StructField(fileGid,StringType,true), StructField(eventStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true)), StructField(revisionStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true), StructField(editIndex,IntegerType,true)),false)) String: [12345,[1,4,edit],[1,4,revision]] Seqnce: WrappedArray(12345, [1,4,edit], [1,4,revision])
注:我在https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala上執行val metric = iter.sum
這部分,但改爲使用DataFrame。我也在使用「使用foreachRDD的設計模式」,參見http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning。
我如何轉換這種 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema (見https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala) 迭代項目爲東西是隨便寫(JSON或...? - 我願意)到PostgreSQL中。 (如果不是JSON,請建議如何將此值讀回到DataFrame以供其他點使用)。
更好 - @ zero323指出我這個話題,以改善我的答案的第一部分(即刪除zip) - http://stackoverflow.com/questions/36157810/spark-row-to-json – codeaperature