2015-11-11 48 views
1

假設我有Tuple2一個RDD象下面這樣:如何在java Spark中結合兩個RDD與不同的鍵?

<session1_w1, <0.2, 2>>, 
<session1_w2, <1.3, 4>>, 
<session1_w3, <0.4, 3>>, 
<session2_w1, <0.5, 2>>, 
<session2_w2, <2.3, 6>> 

我需要把它映射到以下RDD,使得最後一個字段是元組的最後場具有相同的部分關鍵字值的總和例如session1

2 + 4 + 3 => 9 
2 + 6 => 8 

所以,我希望得到的結果是:

<session1_w1, 0.2, 9>, 
<session1_w2, 1.3, 9>, 
<session1_w3, 0.4, 9>, 
<session2_w1, <0.5, 8>>, 
<session2_w2, <2.3, 8>> 

這是某種形式的減少,但我不希望失去原有的鑰匙。

我可以通過映射計算總和,然後減少到下面的RDD,但是我需要將此RDD與第一個RDD合併以獲得結果。

<session1, 9> <session2, 8> 

任何想法?

回答

2

您使用groupBy其保留您的RDD的結構(但不保留順序,所以如果你想保存的順序,你必須zipWithIndex後來sortBy索引)。

否則,如果您有RDD[(String,(Double,Int))]

// This should give you an RDD[(Iterative(String,Double),Int)] 
val group = myRDD.groupBy(_._1).map(x => (x._2.map(y => (y._1,y._2._1)), 
              x._2.map(y => y._2._2).reduce(_+_))) 

// This will give you back your RDD of [Summed Int, String, Double] which you can then map. 
val result = group.map(x => (x._2,x._1)).flatMapValues(x => x) 

你也可以做一個簡單的reduceByKey(無雙),後來加入回原來的RDD使得原來的雙打被保留。

========== EDIT ============

第二加入溶液簡單地使用RDD加入。你的原始RDD格式爲RDD[(String,(Double,Int))],我假設你已經獲得了[(String,Int)]的RDD,其中String是session,Int是總和。加入操作很簡單:

RDDOriginal.join(RDDwithSum).map(x=>(x._1,x._2._1._1,x._2._2)) // This should give you the Session (String) followed by the Double and the Int (the sum). 

join方法也不會保留順序,如果你想保持順序,你將不得不做zipWithIndex。

+0

謝謝,但對於您的第二個解決方案,我如何使用不同的密鑰連接兩個RDD?例如,第一個'key = session1_w1',第二個'key = session1' –

+0

@Mashaye你對join的定義是什麼? –

+0

正常情況下,使用同一組鍵在Rdds之間進行連接。我的RDD具有不同的問題,如問題中所述。無論如何,我使用了GameOfThrows提供的第二個解決方案,並且它工作正常。 –

相關問題