2016-12-28 65 views
-1

我有RDD的鍵/值對,對於我需要調用某個接受RDD的函數的每個鍵。所以我嘗試了RDD.Map和內部映射使用sc.parallelize(value)方法創建了RDD,並將此rdd發送給我的函數,但由於Spark不支持在RDD內創建RDD,因此這不起作用。如何在地圖功能中創建RDD

您能否就這種情況向我建議任何解決方案?

我正在尋找解決方案,建議在下面的線程,但我遇到的問題是我的鑰匙沒有固定,我可以有任何數量的鑰匙。
How to create RDD from within Task?

感謝

+1

沒有通用的解決方案。無法從地圖中調用RDD。如果你提供一些你的邏輯代碼,可能會提出一個合適的改變。 –

+3

這聽起來像[XY問題](http://meta.stackexchange.com/questions/66377/what-is-the-xy-problem)。你爲什麼依賴於你自己的函數邏輯中的'RDD'? –

+0

你想用Spark實現什麼?你能否描述你的用例(而不是你如何使用Spark)? –

回答

0

不,你不應該內RDD創建RDD。

取決於數據的大小,可能有兩種解決方案:

1)如果有多個鍵,每個鍵有沒有太多的價值。將接受RDD的函數轉換爲接受Iterable的函數。那麼你可以做一些像

// rdd: RDD[(keyType, valueType)] 
rdd.groupByKey() 
    .map { case (key, values) => 
    func(values) 
    } 

2)如果有幾個鍵,每個鍵有很多值。那麼你不應該做一個組,因爲它會收集一個執行器的密鑰的所有值,這可能會導致OutOfMemory。相反,運行每個鍵的作業,如

rdd.keys.distinct().collect() 
    .foreach { key => 
    func(rdd.filter(_._1 == key))   
    } 
1

這聽起來不太對勁。如果函數需要處理鍵值對,它應該接收該對作爲參數,而不是RDD。

但是,如果您真的想將RDD作爲參數發送,而不是在鏈操作中,則可以在預處理後創建引用並將該引用發送給該方法。