2016-09-14 80 views
0

取從複雜的數據幀模式的數據我有以下結構星火:地圖

json.select($"comments").printSchema 

root 
|-- comments: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- comment: struct (nullable = true) 
| | | |-- date: string (nullable = true) 
| | | |-- score: string (nullable = true) 
| | | |-- shouts: array (nullable = true) 
| | | | |-- element: string (containsNull = true) 
| | | |-- tags: array (nullable = true) 
| | | | |-- element: string (containsNull = true) 
| | | |-- text: string (nullable = true) 
| | | |-- username: string (nullable = true) 
| | |-- subcomments: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- date: string (nullable = true) 
| | | | |-- score: string (nullable = true) 
| | | | |-- shouts: array (nullable = true) 
| | | | | |-- element: string (containsNull = true) 
| | | | |-- tags: array (nullable = true) 
| | | | | |-- element: string (containsNull = true) 
| | | | |-- text: string (nullable = true) 
| | | | |-- username: string (nullable = true) 

我想獲得註釋的數組/列表[用戶名,分數,文字]。通常情況下,在pyspark我會做這樣的事情

comments = json 
.select("comments") 
.flatMap(lambda element: 
    map(lambda comment: 
     Row(username = comment.username, 
      score = comment.score, 
      text = comment.text), 
     element[0]) 
.toDF() 

但是,當我嘗試在斯卡拉

json.select($"comments").rdd.map{row: Row => row(0)}.take(3) 

同樣的方法,我有一些奇怪的輸出

Array[Any] = 
Array(
    WrappedArray([[stirng,string,WrappedArray(),WrappedArray(),,string] ...], ...) 

有什麼辦法在scala中執行該任務與使用python一樣簡單嗎?

此外,如何遍歷WrappedArray像陣列/列表,我有這樣的

rror: scala.collection.mutable.WrappedArray.type does not take parameters 

回答

2

錯誤如何使用靜態類型Dataset呢?

case class Comment(
    date: String, score: String, 
    shouts: Seq[String], tags: Seq[String], 
    text: String, username: String 
) 

df 
    .select(explode($"comments.comment").alias("comment")) 
    .select("comment.*") 
    .as[Comment] 
    .map(c => (c.username, c.score, c.date)) 

可以,如果你不依賴於REPL進一步簡化:

df 
    .select("comments.comment") 
    .as[Seq[Comment]] 
    .flatMap(_.map(c => (c.username, c.score, c.text))) 

如果你真的想對付Rows使用類型干將:

df.rdd.flatMap(
    _.getAs[SR]("comments") 
    .map(_.getAs[Row]("comment")) 
    .map { 
     // You could also _.getAs[String]("score") or getString(0) 
     case Row(_, score: String, _, _, text: String, username: String) => 
     (username, score, text) 
    } 
)