2015-06-22 19 views
2

我在表T1以下數據如何使用Spark-SQL轉換行到列?

col1 | col2 | 
sess-1 | read | 
sess-1 | meet | 
sess-1 | walk | 
sess-2 | watch | 
sess-2 | sleep | 
sess-2 | run | 
sess-2 | drive | 

預期輸出:

col1 | col2     | 
sess-1 | read,meet,walk  | 
sess-2 | watch,sleep,run,drive | 

我使用星火1.4.0

回答

0

檢查火花

aggregateByKey

scala> val babyNamesCSV = sc.parallelize(List(("David", 6), ("Abby", 4), ("David", 5), ("Abby", 5))) 
babyNamesCSV: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12 


scala> babyNamesCSV.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect 
res1: Array[(String, Int)] = Array((Abby,9), (David,11)) 

上面的例子可以幫助理解

或聚集 https://spark.apache.org/docs/0.6.0/api/core/spark/Aggregator.html

+0

感謝您的答覆......我問是有點不同,我發現,通過做一些RND,這我在下面發帖 –

0
// create RDD data 
scala> val data = sc.parallelize(List(("sess-1","read"), ("sess-1","meet"), 
    ("sess-1","walk"), ("sess-2","watch"),("sess-2","sleep"), 
    ("sess-2","run"),("sess-2","drive"))) 

//groupByKey will return Iterable[String] CompactBuffer** 
scala> val dataCB = data.groupByKey()` 

//map CompactBuffer to List 
scala> val tx = dataCB.map{case (col1,col2) => (col1,col2.toList)}.collect 

data: org.apache.spark.rdd.RDD[(String, String)] = 
ParallelCollectionRDD[211] at parallelize at <console>:26 

dataCB: org.apache.spark.rdd.RDD[(String, Iterable[String])] = 
ShuffledRDD[212] at groupByKey at <console>:30 

tx: Array[(String, List[String])] = Array((sess-1,List(read, meet, 
walk)), (sess-2,List(watch, sleep, run, drive))) 

//groupByKey and map to List can also achieved in one statment 
scala> val dataCB = data.groupByKey().map{case (col1,col2) 
    => (col1,col2.toList)}.collect