2015-05-21 149 views
6

我正在使用Spark Scala API。我有一個Spark SQL數據幀(從Avro的文件中讀取)與下面的模式:如何在flatMap中使用Spark SQL DataFrame?

root 
|-- ids: array (nullable = true) 
| |-- element: map (containsNull = true) 
| | |-- key: integer 
| | |-- value: string (valueContainsNull = true) 
|-- match: array (nullable = true) 
| |-- element: integer (containsNull = true) 

本質上2列[IDS:列表[圖[詮釋,字符串]],匹配:列表[INT]]。

[List(Map(1 -> a), Map(2 -> b), Map(3 -> c), Map(4 -> d)),List(0, 0, 1, 0)] 
[List(Map(5 -> c), Map(6 -> a), Map(7 -> e), Map(8 -> d)),List(1, 0, 1, 0)] 
... 

我想這樣做是flatMap()每一行產生3列[ID財產比賽]:看起來像的樣本數據。使用上面兩行作爲輸入數據,我們將得到:

[1,a,0] 
[2,b,0] 
[3,c,1] 
[4,d,0] 
[5,c,1] 
[6,a,0] 
[7,e,1] 
[8,d,0] 
... 

,然後groupByString財產(例如:A,B,...)產生count("property")sum("match")

a 2 0 
b 1 0 
c 2 2 
d 2 0 
e 1 1 

我希望做這樣的事情:

val result = myDataFrame.select("ids","match").flatMap( 
    (row: Row) => row.getList[Map[Int,String]](1).toArray()) 
result.groupBy("property").agg(Map(
    "property" -> "count", 
    "match" -> "sum")) 

的問題是flatMap將DataFrame轉換爲RDD。是否有一種好方法可以使用DataFrames執行flatMap類型的操作,然後使用groupBy

回答

8

flatMap做什麼你想要的?它將每個輸入行轉換爲0或更多行。它可以過濾出來,或者可以添加新的。在SQL中,您可以使用join獲得相同的功能。你能用join做你想做的事嗎?

或者,您也可以看看Dataframe.explode,這僅僅是一個特定種類的join(你可以很容易地手藝自己explode通過加入一個數據幀到UDF)。 explode將單個列作爲輸入,並讓您將其拆分或將其轉換爲多個值,然後將原始行重新轉換回新行。所以:

user  groups 
griffin mkt,it,admin 

將變成:

user  group 
griffin mkt 
griffin it 
griffin admin 

所以,我要說看看DataFrame.explode如果不讓你有輕鬆,嘗試用UDF的連接。

+0

謝謝你的回答! DataFrame.explode方法正是我所需要的。 –

0

我的SQL有點生疏,但您的flatMap中有一個選項可以生成Row對象列表,然後您可以將生成的RDD轉換回DataFrame。

相關問題