2017-10-12 98 views
-2

我對Spark和Scala頗爲陌生,並且擁有Java背景。我已經在haskell中做了一些編程,所以對函數式編程不是全新的。使用Scala中的RDD.map()執行嵌套循環

我正試圖完成某種形式的嵌套for循環。我有一個RDD,我想根據RDD中的每兩個元素來操作。僞碼(類似Java的)應該是這樣的:

// some RDD named rdd is available before this 
List list = new ArrayList(); 
for(int i = 0; i < rdd.length; i++){ 
    list.add(rdd.get(i)._1); 
    for(int j = 0; j < rdd.length; j++){ 
     if(rdd.get(i)._1 == rdd.get(j)._1){ 
     list.add(rdd.get(j)._1); 
     } 
    } 
} 
// Then now let ._1 of the rdd be this list 

我的斯卡拉解決方案(即不工作)是這樣的:

val aggregatedTransactions = joinedTransactions.map(f => { 
    var list = List[Any](f._2._1) 
    val filtered = joinedTransactions.filter(t => f._1 == t._1) 

    for(i <- filtered){ 
     list ::= i._2._1 
    } 

    (f._1, list, f._2._2) 
    }) 

我試圖實現把項目_2 ._1如果兩項中的._1都相同,則將它們列入列表中。 我知道我不能在另一個地圖功能中做任何濾鏡或貼圖功能。我讀過你可以通過連接實現這樣的事情,但是我不明白我是如何將這些項目放入列表或任何可用作列表的結構的。

你如何用RDD獲得這樣的效果?

+1

我認爲你需要說明你想要更準確地達到什麼目標(即,我不認爲Java代碼符合你的陳述意圖)。對於初學者來說,爲什麼你不使用大小寫類來定義你正在使用的對象呢? – josephpconley

+2

如果你第一次使用scala,我會強烈建議花一些時間玩scala,尤其是scala系列。希望這有助於 – Pavel

+0

我無法使用scala集合,因爲集合不能被序列化,因此會在spark系統上拋出一個錯誤(由於垃圾收集器超時運行..)。這確實是我的第一次嘗試。 –

回答

0

假設你輸入具有某些類型A, B形式RDD[(A, (A, B))],而預期的結果應該有形式RDD[A] - 不是List(因爲我們希望保持分佈式數據) - 這似乎做你的需要:

rdd.join(rdd.values).keys 

詳細

很難理解確切的輸入和預期的輸出,作爲數據結構(類型)既不明確陳述,和的要求不能很好地由碼爲例進行了說明。所以我會做出一些假設,並希望它能幫助您解決具體的問題。

對於完整的例子,我會假設:

  • 輸入RDD具有類型RDD[(Int, (Int, Int))]
  • 預期輸出的形式RDD[Int],並且將包含大量重複的 - 如果原始RDD具有「鍵」 X多次,每場比賽(在._2._1)將如果那是我們正在努力解決的情況下出現每X發生一次作爲關鍵

- 這join會解決這個問題:

// Some sample data, assuming all ints 
val rdd = sc.parallelize(Seq(
    (1, (1, 5)), 
    (1, (2, 5)), 
    (2, (1, 5)), 
    (3, (4, 5)) 
)) 

// joining the original RDD with an RDD of the "values" - 
// so the joined RDD will have "._2._1" as key 
// then we get the keys only, because they equal the values anyway 
val result: RDD[Int] = rdd.join(rdd.values).keys 

// result is a key-value RDD with the original keys as keys, and a list of matching _2._1 
println(result.collect.toList) // List(1, 1, 1, 1, 2) 
+0

我現在意識到我應該更具體地陳述我的問題。 我試圖實現的是一個持有聚合邊的RDD。 ._1是我想加入他們的關鍵。 結構是(id,(src,dest))。因此,我想實現一個源列表,同時每個RDD「行」只保留一個ID。 因此:(id,(list ,dest)) 我仍然不完全確定如何在查看代碼時實現此目的。你可以試着解釋一下嗎? –

+0

這聽起來與本文描述的非常不同(或者Java代碼如果工作的話會做什麼...),所以我建議你在這個評論中發佈一個* new *問題,加上一個樣例INPUT和預期的OUTPUT 。有人(也許是我!)然後可以提供幫助。 –