2016-03-08 48 views
0

淵源考數據從列表中進行RDD中階及火花

ID, NAME, SEQ, NUMBER 
A, John, 1, 3 
A, Bob, 2, 5 
A, Sam, 3, 1 
B, Kim, 1, 4 
B, John, 2, 3 
B, Ria, 3, 5 

使用MAK ID分組列表,我做了以下

val MapRDD = originDF.map { x => (x.getAs[String](colMap.ID), List(x)) } 
val ListRDD = MapRDD.reduceByKey { (a: List[Row], b: List[Row]) => List(a, b).flatten } 

我的目標是使這個RDD(目的是找SEQ-1的名字和每個ID組在數量差異)

ID, NAME, SEQ, NUMBER, PRE_NAME, DIFF 
A, John, 1, 3, NULL, NULL 
A, Bob, 2, 5, John, 2 
A, Sam, 3, 1, Bob, -4 
B, Kim, 1, 4, NULL, NULL 
B, John, 2, 3, Kim, -1 
B, Ria, 3, 5, John, 2 

目前ListRDD會像

A, ([A,Jone,1,3], [A,Bob,2,5], ..) 
B, ([B,Kim,1,4], [B,John,2,3], ..) 

這是代碼中,我試圖讓我的ListRDD目標RDD(不工作,因爲我想)

def myFunction(ListRDD: RDD[(String, List[Row])]) = { 
    var rows: List[Row] = Nil 
    ListRDD.foreach(row => { 
     rows ::: make(row._2) 
    }) 
    //rows has nothing and It's not RDD 
    } 

    def make(eachList: List[Row]): List[Row] = { 
     caseList.foreach { x => //... Make PRE_NAME and DIFF in new List 
    } 

我的最終目標是拯救這個RDD以CSV(RDD.saveAsFile ...)。如何使用此數據製作此RDD(不是列表)。

+0

你可能想使用'groupBy'方法 –

+0

如果我使用GROUPBY方法 –

+0

我沒有假裝回答你的問題如何合併數據,我只是想開導你 –

回答

1

窗口函數看起來像一個不錯的選擇在這裏:

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.lag 

val df = sc.parallelize(Seq(
    ("A", "John", 1, 3), 
    ("A", "Bob", 2, 5), 
    ("A", "Sam", 3, 1), 
    ("B", "Kim", 1, 4), 
    ("B", "John", 2, 3), 
    ("B", "Ria", 3, 5))).toDF("ID", "NAME", "SEQ", "NUMBER") 

val w = Window.partitionBy($"ID").orderBy($"SEQ") 

df.select($"*", 
    lag($"NAME", 1).over(w).alias("PREV_NAME"), 
    ($"NUMBER" - lag($"NUMBER", 1).over(w)).alias("DIFF"))