2016-11-21 123 views
0

我使用PySpark,我正在尋找一種方法來檢查:通過篩選值RDD PySpark

對於給定的check_number = 01

如果第三元素在我的rdd1值不包含check_number ==>獲取所有信息有關從rdd2這個check_number ..

考慮:

rdd1 = sc.parallelize([(u'_guid_F361IeVTC8Q0kckDRw7iOJCe64ELpRmMKQgESgf-uEE=', 
         u'serviceXXX', 
         u'testAB_02', 
         u'2016-07-03')]) 

假設第一個元素是ID,第二個元素是服務名稱,第三個元素是測試名稱,第ID,第四個元素是日期。

rdd2 = sc.parallelize([(u'9b023b8233c242c09b93506942002e0a', 
         u'01', 
         u'2016-11-02'), 

         (u'XXXX52547412558933nnBlmquhdyhM', 
         u'02', 
         u'2016-11-04')]) 

假設第一個元素是ID,第二個元素是測試ID,最後一個元素是日期。

所以,我在我的rdd1testAB_02,這不符合我的check_number(所以服務名稱必須以check_number的值結尾)。我的對象如果是從rdd2獲得所有行,並且01作爲測試ID。這裏的預期輸出必須是:

[(u'9b023b8233c242c09b93506942002e0a', 
    u'01', 
    u'2016-11-02') 

這是我的代碼:

def update_typesdecohorte_table(rdd1, rdd2): 

    if rdd1.filter(lambda x : (re.match('.*?' + check_number, x[2]))).isEmpty() is True: 

     new_rdd2 = rdd2.filter(lambda x : x[1] == check_number) 

    else: 

     pass 

    return new_rdd2 

new_rdd2 = update_typesdecohorte_table(rdd1, rdd2) 

至極給出:

[(u'9b023b8233c242c09b93506942002e0a', u'01', u'2016-11-02')] 

此代碼的工作,但我不喜歡的方法..什麼是最有效的方法來做到這一點?

回答

1

如果你想從RDD2那些在RDD1集沒有匹配的元素中的所有記錄,你可以使用cartesian

new_rdd2 = rdd1.cartesian(rdd2) 
    .filter(lambda r: not r[0][2].endswith(r[1][1])) 
    .map(lambda r: r[1]) 

如果您check_number是固定的,在該值端過濾:

new_rdd2.filter(lambda r: r[1] == check_number).collect() 

但是,如果你的check_number是固定的,並且兩個RDD都很大,它將比你的解決方案慢,因爲它在連接期間需要在分區上進行混洗(你的代碼只執行非混洗轉換)。

+0

Thx Mariusz!這就是我想要的,而且這個方法看起來很健壯! – DataAddicted