2014-10-28 97 views
0

我正在與星火java。我有一個JavaPairRDD命名輸出1結合兩個JavaPairRDD

輸出1:

IDCLIENT|INFO|  
1|A|  
1|C|  
1|H|  
5|R|  
2|B| 

,我想創建一個新的JavaPairRDD命名輸出2是一樣輸出1沒有第一行:

Out2:

IDCLIENT2|INFO|  
1|C|  
1|H|  
5|R|  
2|B| 

之後,我想這兩個JavaPairRDD結合起來是這樣的:

Out3的:

IDCLIENT|INFO|IDCLIENT2| 
1|A,C|1| 
1|C,H|1| 
1|H,R|5|  
5|R,B|2|  
2|B| | 

注:我們不能用groupByKey,因爲我們可以有相同的密鑰在不止一行中。

+0

多大此數據集?你能用普通的Scala完成它嗎?如:out1.zip(out1.drop1).map {case(o1,o2)=> combine(o1,o2)}'? – maasg 2014-10-28 15:48:00

回答

1

在RDD中沒有「拖放」操作,因此刪除行比預期的要困難得多。

我會使用zipWithIndex和索引加入。這是一種沉重的,但會完成這項工作:

val indexed1 = out1.zipWithIndex 
val indexed2 = indexed1.map{(k,v) => ((k-1),v)} 
val joined = indexed1 join indexed2 
val out3 = joined.map{case (k,(v1,v2)) => format(v1,v2)} 
// where format gets the values in the desired output layout 

如果數據集將適合在內存中,我只是做了簡單的Scala的oneliner「:

out1.zip(out1.drop(1)).map{case (o1, o2) => format(o1,o2)}