2017-10-12 34 views
0

在PySpark,我有2 RDD的其結構爲(鍵,名單列表):Pyspark:使用地圖功能,而不是收集迭代RDDS

input_rdd.take(2) 
[(u'100', 
    [[u'36003165800', u'70309879', u'1']]), 
(u'200', 
    [[u'5196352600', u'194837393', u'99']]) ] 

output_rdd.take(2) 
[(u'100', 
    [[u'875000', u'5959', u'1']]), 
(u'300', [[u'16107000', u'12428', u'1']])] 

現在我想要一個結果RDD(如圖所示下面),它將基於鍵的兩個RDD分組,並按順序給出輸出爲元組(鍵,(,))。當輸入或輸出中沒有任何鍵時,那麼該rdd的列表保持爲空。

[(u'100', 
([[[u'36003165800', u'70309879', u'1']]], 
[[[u'875000', u'5959', u'1']]]), 
(u'200', 
([[[u'5196352600', u'194837393', u'99']]], 
    [])), 
(u'300',([],[[[u'16107000', u'12428', u'1']]]) 
] 

爲了獲得所得RDD我使用的下面的代碼段使用

resultant=sc.parallelize(x, tuple(map(list, y))) for x,y in sorted(list(input_rdd.groupWith(output_rdd).collect())) 

是否與groupWith功能辦法可以除去.collect(),而使用.MAP()在Pyspark中獲得相同的結果RDD?

+0

給空RDD – raul

回答

0

完全外部聯結給出:

input_rdd.fullOuterJoin(output_rdd).collect() 
# [(u'200', ([[u'5196352600', u'194837393', u'99']], None)), 
# (u'300', (None, [[u'16107000', u'12428', u'1']])), 
# (u'100', ([[u'36003165800', u'70309879', u'1']], [[u'875000', u'5959', u'1']]))] 

要更換None[]

input_rdd.fullOuterJoin(output_rdd).map(lambda x: (x[0], tuple(i if i is not None else [] for i in x[1]))).collect() 

# [(u'200', ([[u'5196352600', u'194837393', u'99']], [])), 
# (u'300', ([], [[u'16107000', u'12428', u'1']])), 
# (u'100', ([[u'36003165800', u'70309879', u'1']], [[u'875000', u'5959', u'1']]))]