2016-10-31 23 views
1

我正在嘗試建立一個隊列研究來跟蹤應用內用戶行爲,並且我想詢問您是否有任何關於如何在使用.join時指定pyspark中的條件的想法) 考慮:內部加入Pyspark進行隊列研究

rdd1 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8', 
    ((u'service1', 
     u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', 
     u'2016-02-08', 
     u'2016-39', 
     u'2016-6', 
     u'2016-2', 
     '2016-10-19'), 
    (u'service2', 
     u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', 
     u'1', 
     u'67.0', 
     u'2016-293', 
     u'2016-42', 
     u'2016-10', 
     '2016-10-19')))]) 


rdd2 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8', 
    ((u'serice1', 
     u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', 
     u'2016-02-08', 
     u'2016-39', 
     u'2016-6', 
     u'2016-2', 
     '2016-10-20'), 
    (u'service2', 
     u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', 
     u'10', 
     u'3346.0', 
     u'2016-294', 
     u'2016-42', 
     u'2016-10', 
     '2016-10-20')))]) 

這兩個RDDS代表信息有關用戶,以「6df99638e4584a618f92a9cfdf318cf8」爲ID,和誰已經登錄服務1和服務2在二零一六年十月十九日和二○一六年十月二十○日。我的客體是加入我的兩個rdds,每個包含至少20000行。所以它必須是內連接。真正的目標是讓所有已在2016-10-19登錄的用戶登錄並在2016-10-20登錄。更具體地說,我的最終目標是在內部連接之後爲rxemple創建rdd2的內容​​。

預期輸出:

[(u'6df99638e4584a618f92a9cfdf318cf8', 
((u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'), 
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20')) 
) ] 

一個簡單的加入rdd1.join(RDD2)給我,在邏輯上,包含所有對的元素的與兩個RDDS相匹配的RDD。一個左外連接或一個右外連接並不適合我的土地,因爲我想要一個內連接(只是ID已經存在於rdd1和rdd2中)。

預期輸出:假設我們有兩個字典:dict1 = {'a ':'man','b':女人,'c':'寶貝'}和dict2 = {'a':'Zara','x':芒果,'y':'Celio'}。預期的輸出必須是:output_dict = {'a':'Zara'}。 'a'(關鍵字)在字典1中已經存在,我想要的是來自dict2的關鍵值!

它試圖做到這一點:

​​

此代碼給我一個空RDD。

enter image description here

怎麼辦? PS:我必須處理rdds,而不是數據框!所以我不想將我的rdds轉換爲DataFrame:D 任何幫助表示讚賞。謝謝 !

+0

什麼是預期的輸出? – Yaron

+0

@Yaron:[(u'6df99638e4584a618f92a9cfdf318cf8' , ((u'serice1' , u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A」, u'2016-02-08' , u'2016-39' , u'2016-6' , u'2016-2' , '二○一六年十月二十零日'), (u'service2' , u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A」, ù '10', u'3346.0' , u'2016-294' , u'2016-42' , u'2016-10' , '2016年10月20日'))) ] – DataAddicted

+0

@Yaron :rdd2的內容​​。我正在尋找rdd1(2016-10-19)和rdd2(2016-10-20)中存在的用戶。 – DataAddicted

回答

2

所以,你正在尋找一個加入RDD1集和RDD2的,這將需要從只RDD2鍵和值:

rdd_output = rdd1.join(rdd2).map(lambda (k,(v1,v2)):(k,v2)) 

結果是:

print rdd_output.take(1) 

[(u'6df99638e4584a618f92a9cfdf318cf8', (
(u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'), 
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20') 
))] 
+0

聽起來不錯!謝謝@Yaron! – DataAddicted