0

隨着數據幀稱爲lastTail,我可以重複這樣的:星火斯卡拉據幀單列轉換成JSON用於PostrgeSQL插入

import scalikejdbc._ 
// ... 
// Do Kafka Streaming to create DataFrame lastTail 
// ... 

lastTail.printSchema 

lastTail.foreachPartition(iter => { 

// open database connection from connection pool 
// with scalikeJDBC (to PostgreSQL) 

    while(iter.hasNext) { 
    val item = iter.next() 
    println("****") 
    println(item.getClass) 
    println(item.getAs("fileGid")) 
    println("Schema: "+item.schema) 
    println("String: "+item.toString()) 
    println("Seqnce: "+item.toSeq) 

    // convert this item into an XXX format (like JSON) 
    // write row to DB in the selected format 
    } 
}) 

這個輸出「像」(與密文): root |-- fileGid: string (nullable = true) |-- eventStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true) |-- revisionStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true)

和(只有一個迭代項 - 編輯,但希望具有足夠好的語法)

**** class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 12345 Schema: StructType(StructField(fileGid,StringType,true), StructField(eventStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true)), StructField(revisionStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true), StructField(editIndex,IntegerType,true)),false)) String: [12345,[1,4,edit],[1,4,revision]] Seqnce: WrappedArray(12345, [1,4,edit], [1,4,revision])

注:我在https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala上執行val metric = iter.sum這部分,但改爲使用DataFrame。我也在使用「使用foreachRDD的設計模式」,參見http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

我如何轉換這種 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema (見https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala) 迭代項目爲東西是隨便寫(JSON或...? - 我願意)到PostgreSQL中。 (如果不是JSON,請建議如何將此值讀回到DataFrame以供其他點使用)。

回答

0

那麼我想出了一個不同的方法來解決這個問題。

val ltk = lastTail.select($"fileGid").rdd.map(fileGid => fileGid.toString) 
val ltv = lastTail.toJSON 
val kvPair = ltk.zip(ltv) 

然後,我只是遍歷RDD而不是DataFrame。

kvPair.foreachPartition(iter => { 
    while(iter.hasNext) { 
    val item = iter.next() 
    println(item.getClass) 
    println(item) 
    } 
}) 

的數據不談,我得到class scala.Tuple2這使得存儲在JDBC/PostgreSQL的KV對一個更簡單的方法。

我相信還有其他方法可以解決問題。

+0

更好 - @ zero323指出我這個話題,以改善我的答案的第一部分(即刪除zip) - http://stackoverflow.com/questions/36157810/spark-row-to-json – codeaperature