2017-05-16 46 views
-3

我是新來的火花,我試圖過濾包含出現在所有其他rdds中的項目的最終rdd。獲取出現在所有rdds中的項目 - Pyspark

我的代碼

a = ['rs1','rs2','rs3','rs4','rs5'] 
b = ['rs3','rs7','rs10','rs4','rs6'] 
c = ['rs10','rs13','rs20','rs16','rs1'] 
d = ['rs2', 'rs4', 'rs5', 'rs13', 'rs3'] 

a_rdd = spark.parallelize(a) 
b_rdd = spark.parallelize(b) 
c_rdd = spark.parallelize(c) 
d_rdd = spark.parallelize(d) 

rdd = spark.union([a_rdd, b_rdd, c_rdd, d_rdd]).distinct() 

結果:[ 'RS4', 'RS16', 'RS5', 'RS6', 'RS7', 'RS20', 'RS1', 'RS13',「RS10 」, 'RS2', 'RS3']

我的預期結果爲:[ 'RS3', 'RS4']

謝謝!

+0

我建議您閱讀有關文檔的更多信息。 https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.distinct。嘗試檢查內部連接。 –

+0

我的不好,它沒有找到那個API文檔頁面,我會花更多時間在上面謝謝 – pthphap

回答

1

當你說你想要一個包含所有rdds中的項目的rdd時,你的意思是交叉點?如果這是你不應該使用聯盟和你的RDDS的交集的情況下是空的(沒有元素反覆在4個RDDS)

,但如果你需要做的RDDS的交集:

def intersection(*args): 
     return reduce(lambda x,y:x.intersection(y),args) 

    a = ['rs1','rs2','rs3','rs4','rs5'] 
    b = ['rs3','rs7','rs1','rs2','rs6'] 
    c = ['rs10','rs13','rs2','rs16','rs1'] 
    d = ['rs2', 'rs4', 'rs1', 'rs13', 'rs3'] 

    a_rdd = sc.parallelize(a) 
    b_rdd = sc.parallelize(b) 
    c_rdd = sc.parallelize(c) 
    d_rdd = sc.parallelize(d) 

    rdd = sc.union([a_rdd, b_rdd, c_rdd, d_rdd]).distinct() 
    intersection(a_rdd, b_rdd, c_rdd, d_rdd).collect() 

輸出結果爲['rs1','rs2']

+0

我有一個建議'reduce'你可以這樣添加:'reduce(RDD.intersection,args)' –

+1

啊是的這是一個更優雅的方式:) –

+0

這工作就像一個魅力。謝謝 – pthphap