2017-03-31 75 views
1

我有一個以地圖爲元素的RDD。當然,我不能使用RDD.get。因此,截至目前,我這樣做,以獲取值從這個映射鍵:從scala獲取地圖rdd中的密鑰值

val x = RDD.collect().flatten.toMap 

然後

x.get(key) 

得到的關鍵價值。現在,有一個非常大的rdd,它在rdd上應用.collect()時輸出錯誤java.lang.OutOfMemoryError: GC overhead limit exceeded。如何在不應用rdd上的.collect()的情況下執行此操作?

+0

你能共享一個可重複包括預期產出的例子? – mtoto

+0

跳到我身上的一件事是,你打電話收集(一個「行動」)太早。您需要將RDD轉換爲(希望的)更小的RDD - 基本上只需要那些具有所需鍵的元素 - 然後在RDD中只有少數元素的最後時刻調用collect。 – Phasmid

回答

0

既然不能容納一切到你的驅動程序,您首先需要篩選RDD你需要尋找到地圖中,然後執行得到...

val rdd = sc.parallelize(List(Map("a"->1,"b"->2),Map("c"->3,"d"->4))) 

val key = "d" 

val filteredRDD = rdd.filter(_.keySet contains key) 

if (!filteredRDD.isEmpty) filteredRDD.first.get(key) else None 
2

如果它是真正的Map當時的你能做到以下幾點:

rdd.flatMap(identity).lookup(key) 

雖然這仍然會向駕駛員輸出,而是來自鍵只值。所以,如果這可以適應記憶,那麼你就很好。但是,如果你想用它作爲RDD工作還是那麼:

rdd.flatMap(identity) 
    .flatMap{case (key, value) => if(key == myKey) Some(value) else None} 

,應該你想要鍵和值,那麼你可以把flatMap成過濾,只是過濾的key == myKey

+0

我不知道'lookup',這顯然是在這裏使用的正確功能。然而,你的第二個'flatMap'看起來很像一個'filter' ... –

+0

謝謝@CyrilleCorpet我修改了答案以明確爲什麼我選擇了flatMap,但爲了防止出現過濾標註。 –

+0

@JustinPihony,上面的方法可行,但接下來我必須在另一個rdd裏面使用「rdd.flatMap(identity).lookup(key)」來說rdd2。對於rdd2的每個元素,我必須在「rdd」中查找它的值。拋出的錯誤是,這個RDD缺少一個SparkContext。它可能發生在以下情況:(1)RDD轉換和操作不由驅動程序調用,而是在其他轉換中;例如,rdd1.map(x => rdd2.values.count()* x)是無效的,因爲值轉換和計數操作不能在rdd1.map轉換中執行 –