也許我們不應該把這個過程想象成加入。你不是真的希望加入兩個數據集,你想從另一個數據集中減去一個數據集?
我要說明什麼,我從你的問題
- 你不關心的第二個數據集的數值假設,在所有。
- 您只想保留第一個數據集中鍵值對出現在第二個數據集中的值。
理念1:協同組(我想可能是最快的方法)。它基本上是計算兩個數據集的交集。
rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])
intersection = rdd1.cogroup(rdd2).filter(lambda x: x[1][0] and x[1][1])
final_rdd = intersection.map(lambda x: (x[0], list(x[1][0]))).map(lambda (x,y): (x, y[0]))
理念2:減去通過重點
rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])
unwanted_rows = rdd1.subtractByKey(rdd2)
wanted_rows = rdd1.subtractByKey(unwanted_rows)
我不是100%肯定,如果這是比你的方法快。它確實需要兩個subtractByKey
操作,這可能會很慢。此外,此方法不保留順序(例如,((0, 1), 2)
,儘管在您的第一個數據集中是第一個,在最終數據集中是第二個)。但我無法想象這很重要。
至於哪個更快,我認爲這取決於您的cartersian加入需要多長時間。映射和過濾往往比subtractByKey
所需的混洗操作更快,但當然cartesian
是一個耗時的過程。
無論如何,我想你可以嘗試這種方法,看看它是否適合你!
性能改進的旁註,取決於RDD的大小。
如果rdd1
足夠小,可以保存在主存儲器中,如果您廣播它,然後流反對它,減法過程可以非常迅速。但是,我承認這種情況很少。
非常感謝。我想,應該是按照關鍵結果對RDD進行分類的一種方法。再次感謝你。 – Guforu
我也建議看看'十字路口'。現在看來它只適用於整行匹配的情況,這對您的用例不起作用,但我想知道您是否可以操縱'intersection'來僅處理鍵值對的交集。如果我弄清楚,我會更新我的答案。 –
十字路口是一點點其他故事,我真的找鑰匙的交叉點,而不是價值觀(當然你也告訴我)。但我會花一點時間分析你的建議。謝謝 – Guforu