2016-05-26 100 views
3

我在https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html上看到了一個Dataframes教程,這個教程寫在Python。我正試圖將它翻譯成Scala火花在地圖中創建行

他們有下面的代碼:

df = context.load("/path/to/people.json") 
# RDD-style methods such as map, flatMap are available on DataFrames 
# Split the bio text into multiple words. 
words = df.select("bio").flatMap(lambda row: row.bio.split(" ")) 
# Create a new DataFrame to count the number of words 
words_df = words.map(lambda w: Row(word=w, cnt=1)).toDF() 
word_counts = words_df.groupBy("word").sum() 

於是,我第一次看到從csv數據到一個數據幀df後來才知​​道有:

val title_words = df.select("title").flatMap { row =>  
    row.getAs[String("title").split(" ") } 
val title_words_df = title_words.map(w => Row(w,1)).toDF() 
val word_counts = title_words_df.groupBy("word").sum() 

,但我不知道:

  1. 如何將字段名稱分配到行中的行開頭與VAL title_words_df nning = ...

  2. 我有錯誤 「的值toDF不是org.apache.spark.rdd.RDD [org.apache.spark.sql.Row]成員」

在此先感謝您的幫助。

回答

2

如何將字段名分配到行

的Python 是完全不同類型的對象比其對應的Scala的。它是一個增加了名稱的元組,使它與無類型集合(o.a.s.sql.Row)相比更加等效於產品類型。

我有錯誤 「的值toDF不是org.apache.spark.rdd.RDD [org.apache.spark.sql.Row]成員」

由於o.a.s.sql.Row是基本上是無類型的,它不能與toDF一起使用,並且需要createDataFrame具有明確的模式。

import org.apache.spark.sql.types._ 

val schema = StructType(Seq(
    StructField("word", StringType), StructField("cnt", LongType) 
)) 

sqlContext.createDataFrame(title_words.map(w => Row(w, 1L)), schema) 

如果你想你的代碼相當於你應該使用的產品類型,而不是的Python版本。這意味着無論是Tuple

title_words.map((_, 1L)).toDF("word", "cnt") 

或案例類:

case class Record(word: String, cnt: Long) 

title_words.map(Record(_, 1L)).toDF 

但在實踐中,應該不需要使用RDDS:

import org.apache.spark.sql.functions.{explode, lit, split} 

df.select(explode(split($"title", " ")), lit(1L))