2017-04-10 54 views
0

我有一個包含這樣Scala - 如何迭代RDD上的元組?

元組的RDD(A,列表(-2,5,6,7-))

(B,列表(2,8,9,10))

我想獲得第一個元素的索引,其中值和索引之間的特定條件成立。 到目前爲止,我已經試過這對一個元組測試,它工作得很好:

test._2.zipWithIndex.indexWhere { case (v, i) => SOME_CONDITION} 

我只是找不到如何遍歷列表中的所有元組。我曾嘗試:

val result= test._._2.zipWithIndex.indexWhere { case (v, i) => SOME_CONDITION} 

回答

4

首先,「迭代」在這裏是一個錯誤的概念 - 它來自命令式編程的領域,您實際上在自己的數據結構上迭代。 Spark使用功能範例,讓您通過函數來處理RDD中的每個記錄(使用一些高階函數,如map,foreach ...)。

在這種情況下,聽起來像是想將中的每個元素都放到一個新的元素中。

僅僅映射你的元組的右手邊(不改變左側),你可以使用mapValues

// mapValues will map the "values" (of type List[Int]) to new values (of type Int) 
rdd.mapValues(list => list.zipWithIndex.indexWhere { 
    case (v, i) => someCondition(v, i) 
}) 

,或者,使用普通map

rdd.map { 
    case (key, list) => (key, list.zipWithIndex.indexWhere { 
    case (v, i) => someCondition(v, i) 
    }) 
} 
+0

感謝您的迴應!我得到這個錯誤'價值indexWhere不是Iterable成員[(Int,Int)]' – lacrima

+0

我假設你的輸入RDD的類型是'RDD [(String,List [Int])]] - 是不是案件?如果是這樣 - 它是什麼? –

+0

我會說這是,但我可能是錯誤的。名單成員來自wordcount。有沒有辦法檢查對象的類型?或者明確定義它的類型? – lacrima