2017-03-10 58 views
0

我有一個原始的RDD數據的RDD(鑰匙,ID),看起來有點像:合併與RDD(K1,K2)

(A,A) 
(A,B) 
(B,C) 
(C,D) 

這些都是在一個圖形邊緣(表示爲頂點的名字)我使用一些代碼來生成具有唯一ID的第二個RDD。

(A,0) 
(B,41) 
(C,82) 
(D,123) 

我想以某種方式合併這些RDDS得到一個最終的RDD看起來像:

Edge(0,0,AA) 
Edge(0,41,AB) 
Edge(41,82,BC) 
Edge(82,123,CD) 

基本上創建邊緣[RDD]這樣我就可以在這些邊緣使用graphx。是否有可能將Id RDD與原始邊緣RDD合併?

+0

以下任何一條都有幫助? – mtoto

+0

@mtoto我不得不重寫一堆後端代碼,所以我一直無法測試。我認爲@semsorock可能有一個很好的解決方案。我正在考慮現在使用純RDD,並使用'join'將所有配對都按我的想法配對。 –

回答

0

你可以嘗試這樣的事情:

val df1 = rdd1.toDF("col1", "col2") 
val df2 = rdd2.toDF("col", "val") 

df1.join(df2, $"col1" === $"col").drop(col("col")).join(df2, $"col2" === $"col").drop(col("col")).show 

+----+----+---+---+ 
|col1|col2|val|val| 
+----+----+---+---+ 
| A| B| 0| 41| 
| C| D| 82|123| 
| B| C| 41| 82| 
| A| A| 0| 0| 
+----+----+---+---+ 
0

如果你的ID rdd不是太大,你可以播放它創建邊緣rdd

// Create broadcast variable from id _rdd 
val bc_lookup = sc.broadcast(rdd_id.collectAsMap()) 

// Create lookup function that returns intermediate rdd 
def lookup_custom(x: (String, String)): (Int,Int,String) = { 
    (bc_lookup.value.get(x._1).get, 
    bc_lookup.value.get(x._2).get, 
    x._1 + x._2) 
} 

val rdd_result = my_rdd.map(x => lookup_custom(x)).cache() 

// Convert to Edge RDD 
val e_rdd = rdd_result.map(x => Edge(x._1, x._2, x._3)) 

e_rdd.collect() 
// res1: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(0,0,AA), Edge(0,41,AB), Edge(41,82,BC), Edge(82,123,CD)) 

數據

val my_rdd = sc.parallelize(Seq(("A","A"),("A","B"),("B","C"),("C", "D"))) 
val rdd_id = sc.parallelize(Seq(("A",0),("B",41),("C",82),("D",123)))