2015-05-24 55 views
0

我有一個2 RDDs。在Spark scala中,如果他們具有相同的ID,我如何加入event1001RDD和event2009RDD?Spark:組ID由ID

VAL event1001RDD:schemaRDD = [事件類型,ID,位置,日期1]

[1001,4929102,LOC01,2015-01-20 10:44:39] 
[1001,4929103,LOC02,2015-01-20 10:44:39] 
[1001,4929104,LOC03,2015-01-20 10:44:39] 

VAL event2009RDD:schemaRDD = [事件類型,ID,DATE1,DATE2]

[2009,4929101,2015-01-20 20:44:39,2015-01-20 20:44:39] 
[2009,4929102,2015-01-20 15:44:39,2015-01-20 21:44:39] 
[2009,4929103,2015-01-20 14:44:39,2015-01-20 14:44:39] 
[2009,4929105,2015-01-20 20:44:39,2015-01-20 20:44:39] 

預期的結果將be:(unique)(按ID排序)

[eventtype,id,1001's location,1001's date1,2009's date1,2009's date2]

2009,4929101,NULL,NULL,2015-01-20 20:44:39,2015-01-20 20:44:39 
1001,4929102,LOC01,2015-01-20 10:44:39,2015-01-20 15:44:39,2015-01-20 21:44:39 
1001,4929103,LOC02,2015-01-20 10:44:39,2015-01-20 14:44:39,2015-01-20 14:44:39 
1001,4929104,LOC03,2015-01-20 10:44:39,NULL,NULL 
2009,4929105,NULL,NULL,2015-01-20 20:44:39,2015-01-20 20:44:39 

請注意,對於ID 4929102,1001用作事件類型。 2009 eventtype只能在1001中沒有任何匹配的id時使用。

它可以是RDD [String] - flat。或通過aggregateByKey獲得RDD元組。我只需要遍歷RDD。

回答

1

這是全外連接的情況。在這裏,你去...

d1=[[1001,4929102,"LOC01","2015-01-20 10:44:39"],[1001,4929103,"LOC02","2015-01-20 10:44:39"],[1001,4929104,"LOC03","2015-01-20 10:44:39"]] 
d2=[[2009,4929101,"2015-01-20 20:44:39","2015-01-20 20:44:39"],[2009,4929102,"2015-01-20 15:44:39","2015-01-20 21:44:39"], 
    [2009,4929103,"2015-01-20 14:44:39","2015-01-20 14:44:39"],[2009,4929105,"2015-01-20 20:44:39","2015-01-20 20:44:39"]] 

d1RDD = sc.parallelize(d1).map(lambda t: Row(d1_eventtype=t[0],d1_id=t[1],d1_location=t[2],d1_date1=t[3])) 
d2RDD = sc.parallelize(d2).map(lambda t: Row(d2_eventtype=t[0],d2_id=t[1],d2_date1=t[2],d2_date2=t[3])) 
d1DF = ssc.createDataFrame(d1RDD) 
d2DF = ssc.createDataFrame(d2RDD) 
print d1DF.printSchema() 
print d2DF.printSchema() 
d1DF.show() 
d2DF.show() 
d1DF.registerTempTable("d1") 
d2DF.registerTempTable("d2") 
res = ssc.sql("select case when d1.d1_eventtype is not null then d1.d1_eventtype else d2.d2_eventtype end et, \ 
         case when d1.d1_id is not null then d1.d1_id else d2.d2_id end id, \ 
         d1.d1_location loc, d1.d1_date1, d2.d2_date1, d2.d2_date2 \ 
       from d1 full outer join d2 on d1.d1_id=d2.d2_id order by d1.d1_id") 
res.show() 

結果:

root 
|-- d1_date1: string (nullable = true) 
|-- d1_eventtype: long (nullable = true) 
|-- d1_id: long (nullable = true) 
|-- d1_location: string (nullable = true) 

None 
root 
|-- d2_date1: string (nullable = true) 
|-- d2_date2: string (nullable = true) 
|-- d2_eventtype: long (nullable = true) 
|-- d2_id: long (nullable = true) 

None 
d1_date1   d1_eventtype d1_id d1_location 
2015-01-20 10:44:39 1001   4929102 LOC01  
2015-01-20 10:44:39 1001   4929103 LOC02  
2015-01-20 10:44:39 1001   4929104 LOC03  
d2_date1   d2_date2   d2_eventtype d2_id 
2015-01-20 20:44:39 2015-01-20 20:44:39 2009   4929101 
2015-01-20 15:44:39 2015-01-20 21:44:39 2009   4929102 
2015-01-20 14:44:39 2015-01-20 14:44:39 2009   4929103 
2015-01-20 20:44:39 2015-01-20 20:44:39 2009   4929105 
et id  loc d1_date1   d2_date1   d2_date2   
2009 4929101 null null    2015-01-20 20:44:39 2015-01-20 20:44:39 
2009 4929105 null null    2015-01-20 20:44:39 2015-01-20 20:44:39 
1001 4929102 LOC01 2015-01-20 10:44:39 2015-01-20 15:44:39 2015-01-20 21:44:39 
1001 4929103 LOC02 2015-01-20 10:44:39 2015-01-20 14:44:39 2015-01-20 14:44:39 
1001 4929104 LOC03 2015-01-20 10:44:39 null    null  
+0

這正是我需要的。謝謝ayan! :) – sophie

+0

嗨阿彥,我需要更新SQL,因爲我現在有3個RDDs,你能看看嗎?謝謝http://stackoverflow.com/questions/30472975/spark-group-rdd-sql-query – sophie

+0

回答mate ....請隨時讓我知道,如果它的工作(或不) –