2016-08-27 123 views
3

我有一個數據幀,並且我想按列進行分組,並使用相同的模式將這些組轉換回數據幀。原因是我想要在整個組中映射一個帶有簽名DataFrame -> String的函數。下面是我想:如何從分組數據中獲取火花數據幀

val df = sc.parallelize(Seq((1,2,3),(1,2,4),(2,3,4))).toDF 
val schema = df.schema 
val groups = df.rdd.groupBy(x => x(0)) 
       .mapValues(g => sqlContext.createDataFrame(sc.makeRDD(g.toList), schema)) 
       .take(1) 

這裏就是我希望的:

scala> groups(0)._2.collect 
Array[org.apache.spark.sql.Row] = Array([1,2,3], [1,2,4])  

,但它不工作(任務與NullPointerException失敗的)......我想你不能地圖一個引用火花上下文的函數,但我不知道如何實現這個功能?

+1

能你提供一個例子。 您期望輸入和輸出什麼? – giaosudau

回答

1

我猜你不能映射是指火花背景

正確的功能 - 你不能傳遞到任何一個函數內使用任何火花的上下文對象(或RDDS,或Dataframes)的Spark的高級函數,因爲這需要將這些對象序列化並將它們發送給執行程序,但它們故意不可序列化,因爲它沒有任何意義(每個執行程序必須像另一個驅動程序應用程序一樣行事) 。

爲了實現只包含一個「組」一個數據幀,我推薦使用的filter代替groupBy:你可以先collect所有組密鑰,然後每一個映射到數據幀過濾:

val df = sc.parallelize(Seq((1,2,3),(1,2,4),(2,3,4))).toDF 

df.cache() // EDIT: this might speed this up significantly, as DF will be reused instead of recalculated for each key 

val groupKeys: Array[Int] = df.map { case Row(i: Int, _, _) => i }.distinct().collect() 
val dfPerKey: Array[DataFrame] = groupKeys.map(k => df.filter($"_1" === k)) 

dfPerKey.foreach(_.show()) 
// prints: 
// +---+---+---+ 
// | _1| _2| _3| 
// +---+---+---+ 
// | 1| 2| 3| 
// | 1| 2| 4| 
// +---+---+---+ 
// 
// +---+---+---+ 
// | _1| _2| _3| 
// +---+---+---+ 
// | 2| 3| 4| 
// +---+---+---+ 
+0

嗨Tzach,感謝羚牛的時間來回答。這是一個聰明的方法,不幸的是它很慢,我認爲它爲每個過濾器做了一個完整的洗牌(對於我的大數據框,它爲每個組運行一個單獨的階段!任何想法如何更接近'groupBy'的性能? – maxymoo

+0

如果您在收集密鑰並迭代它們之前添加'df.cache()',可能會有所幫助 –

+1

感謝Tzach,修復它!感謝您的耐心,我仍然非常新的火花! – maxymoo