2017-10-14 76 views
0

我試圖在第一列作爲關鍵字執行兩個RDD之間的連接。該RDDS樣子:spark reducebykey並忽略其餘

RDD1: 
(k1,(s11,s12,s13)) 
(k2,(s21,s22,s23)) 
(k3,(s31,s32,s33)) 
... 

RDD2: 
(k1,(t11,t12,t13)) 
(k2,(t21,t22,t23)) 
(k4,(t41,t42,t43)) 
... 

文從一個RDD可能或不可能找到另外一個匹配。但是,如果確實找到了匹配項,它將只與其他RDD的一行相匹配。換句話說,ki是兩個RDD的主鍵。

我被

RDD3=RDD1.union(RDD2).reduceByKey(lambda x,y:(x+y)).filter(lambda x:len(x[1])==6) 

這樣做的結果RDD會是什麼樣子:

RDD3: 
(k1,(s11,s12,s13,t11,t12,t13)) 
(k2,(s21,s22,s23,t21,t22,t23)) 
... 

我想避免使用filter功能,而計算RDD3。它看起來像一個可避免的計算。使用內置火花功能可以做到這一點嗎?我不想用火花SQL或dataframes

回答

1

您需要join方法後跟一個mapValues方法從相同的密鑰串連值:

rdd1.join(rdd2).mapValues(lambda x: x[0] + x[1]).collect() 
# [('k2', ('s21', 's22', 's23', 't21', 't22', 't23')), 
# ('k1', ('s11', 's12', 's13', 't11', 't12', 't13'))]