2016-06-08 22 views
0

例如我在PySpark 2個RDDS:PySpark,路口由Key

((0,0), 1) 
((0,1), 2) 
((1,0), 3) 
((1,1), 4) 

和第二就是

((0,1), 3) 
((1,1), 0) 

我想有從第一RDD交叉口第二個。實際上,第二個RDD必須扮演第一個面具的角色。輸出應爲:

((0,1), 2) 
((1,1), 4) 

這意味着從第一RDD的值,但僅用於從所述第二密鑰。兩個RDD的長度都不相同。

我有一些解決方案(已證明),但這樣的事情:

rdd3 = rdd1.cartesian(rdd2) 
rdd4 = rdd3.filter(lambda((key1, val1), (key2, val2)): key1 == key2) 
rdd5 = rdd4.map(lambda((key1, val1), (key2, val2)): (key1, val1)) 

我不知道,是多麼有效此解決方案。希望聽到有經驗的Spark程序員的意見......

回答

2

也許我們不應該把這個過程想象成加入。你不是真的希望加入兩個數據集,你想從另一個數據集中減去一個數據集?

我要說明什麼,我從你的問題

  1. 你不關心的第二個數據集的數值假設,在所有。
  2. 您只想保留第一個數據集中鍵值對出現在第二個數據集中的值。

理念1:協同組(我想可能是最快的方法)。它基本上是計算兩個數據集的交集。

rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)]) 
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)]) 
intersection = rdd1.cogroup(rdd2).filter(lambda x: x[1][0] and x[1][1]) 
final_rdd = intersection.map(lambda x: (x[0], list(x[1][0]))).map(lambda (x,y): (x, y[0])) 

理念2:減去通過重點

rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)]) 
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)]) 

unwanted_rows = rdd1.subtractByKey(rdd2) 
wanted_rows = rdd1.subtractByKey(unwanted_rows) 

我不是100%肯定,如果這是比你的方法快。它確實需要兩個subtractByKey操作,這可能會很慢。此外,此方法不保留順序(例如,((0, 1), 2),儘管在您的第一個數據集中是第一個,在最終數據集中是第二個)。但我無法想象這很重要。

至於哪個更快,我認爲這取決於您的cartersian加入需要多長時間。映射和過濾往往比subtractByKey所需的混洗操作更快,但當然cartesian是一個耗時的過程。

無論如何,我想你可以嘗試這種方法,看看它是否適合你!


性能改進的旁註,取決於RDD的大小。

如果rdd1足夠小,可以保存在主存儲器中,如果您廣播它,然後流反對它,減法過程可以非常迅速。但是,我承認這種情況很少。

+0

非常感謝。我想,應該是按照關鍵結果對RDD進行分類的一種方法。再次感謝你。 – Guforu

+0

我也建議看看'十字路口'。現在看來它只適用於整行匹配的情況,這對您的用例不起作用,但我想知道您是否可以操縱'intersection'來僅處理鍵值對的交集。如果我弄清楚,我會更新我的答案。 –

+0

十字路口是一點點其他故事,我真的找鑰匙的交叉點,而不是價值觀(當然你也告訴我)。但我會花一點時間分析你的建議。謝謝 – Guforu