2016-10-27 90 views
0

在apache spark中,可以使用sparkContext.union()方法高效地合併多個RDD。如果有人想要交叉多個RDD,有沒有類似的東西?我已經在sparkContext方法中搜索過,並且找不到任何東西或其他地方。一種解決方案可能是聯合rdds然後檢索重複項,但我認爲它不會那麼高效。假設我有一個鍵/值對集合下面的例子:Apache Spark - 多個RDD的交集

val rdd1 = sc.parallelize(Seq((1,1.0),(2,1.0))) 
val rdd2 = sc.parallelize(Seq((1,2.0),(3,4.0),(3,1.0))) 

我想找回一個新的集合,其具有以下元素:

(1,2.0) (1,1.0) 

但當然多RDDS,而不是隻有兩個。

+0

你爲什麼要交叉多個rdds?並基於什麼? – Shankar

+0

我想現在我的問題更好地理解。 –

回答

2

嘗試:

val rdds = Seq(
    sc.parallelize(Seq(1, 3, 5)), 
    sc.parallelize(Seq(3, 5)), 
    sc.parallelize(Seq(1, 3)) 
) 
rdds.map(rdd => rdd.map(x => (x, None))).reduce((x, y) => x.join(y).keys.map(x => (x, None))).keys 
+0

工作,謝謝。但是,如果每個集合都具有鍵/值對而不是整數,那就無法正常工作,對吧?此外,此方法使用連接。通常,散列分區器是一個很好的習慣,對吧? –

+0

只要元素可以被散列,就應該工作。除非你想要一些不同的輸出。不明白第二個問題。 –

+0

在rdds之間使用連接之前的一個好做法是使用Hash分區器來避免冗餘重組,並使其更有效。在你的代碼中你不使用任何散列分區。 –

2

有上RDD的intersection method,但是隻需要一個其他RDD:

def intersection(other: RDD[T]): RDD[T] 

讓我們實現你在這一個方面想方法。

def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] = { 
    rdds.reduce { case (left, right) => left.intersection(right) 
} 

如果你已經看了星火實施的連接,您可以通過將最大RDD第一優化執行:

def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] = { 
    rdds.sortBy(rdd => -1 * rdd.partitions.length) 
    .reduce { case (left, right) => left.intersection(right) 
} 

編輯:它看起來像我誤解你比如:你的文字看起來你正在尋找rdd.union的逆向行爲,但你的例子意味着你想通過鍵來相交。我的回答並沒有解決這個問題。