2015-04-06 55 views
2

我是新來的Scala和Spark。我試圖在地圖轉換過程中返回多個鍵值對。我的輸入數據是一個簡單的CSV文件。如何使用Spark的地圖轉換在Scala中返回多個鍵值對?

 
1, 2, 3 
4, 5, 6 
7, 8, 9 

我的Scala腳本如下所示。

class Key(_i:Integer, _j:Integer) { 
def i = _i 
def j = _j 
} 
class Val(_x:Double, _y:Double) { 
def x = _x 
def y = _y 
} 
val arr = "1,2,3".split(",") 
for(i <- 0 until arr.length) { 
val x = arr(i).toDouble 
for(j <- 0 until arr.length) { 
    val y = arr(j).toDouble 
    val k = new Key(i, j) 
    val v = new Val(x, y) 
    //note that i want to return the tuples, (k, v) 
} 
} 

我希望能夠使用for循環和數據結構上方返回的多個元組(K,V)。類似於下面的代碼。

val file = sc.textFile("/path/to/test.csv") 
file.map(line => { 
val arr = line.split(",") 
for(i <- 0 until arr.length) { 
    val x = arr(i).toDouble 
    for(j <- (i+1) until arr.length) { 
    val y = arr(j).toDouble 
    val k = new Index(i,j) 
    val v = new Val(x,y) 
    (k,v) 
    } 
} 
}).collect //reduceByKey is not there, reduce is there, but not what i want 

當我複製/粘貼上面的代碼到lambda表達式(和斯卡拉REPL shell中運行)我收到以下錯誤:

 
error: illegal start of simple expression 
val arr = line.split(",") 
^ 

我知道也,我仍然停留在命令式/程序式的編程思想,所以請耐心等待(以及Scala/Spark的新手)。

回答

3

你忘記了箭頭後的括號。如果它是一個簡單的表達式(一個表達式),則只能省略它們。編輯後

file.map(line => { 
    //multiple lines of code here 
}) 

完整的答案:

case class Index(i:Integer, j:Integer) 
case class Val(x:Double, y:Double) 

val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9")) 
data.flatMap(line=>{ 
val arr = line.split(",") 
val doubleSeq = for(i <- 0 until arr.length) yield { 
    val x = arr(i).toDouble 
    for(j <- (i+1) until arr.length) yield { 
    val y = arr(j).toDouble 
    val k = Index(i,j) 
    val v = Val(x,y) 
    (k,v) 
    } 
} 
doubleSeq.flatten 
}) 

有大量的問題實際上是:

  • 請注意,我改變了你的類是case類,因爲它們是序列化。否則,你將需要實現Serializable
  • 我改變mapflatMap,以及flatten編你的數組作爲一個flatMap仍然會留下一個內部數組。現在,兩者的組合將產生您的RDD[(Index, Val)],現在可以隱式使用reduceByKey
  • 我通過使用yield將您的for循環轉換爲for的理解。你得到一個最終的類型Unit因爲for環路的返回類型爲Unit
+0

你的建議幫助。現在錯誤消失了。但是,當我添加return語句時,返回(k,v),我得到以下內容:error:return outside method definition。 – 2015-04-06 15:19:28

+0

我沒有看到...不要在scala中返回,最後的語句是返回值。這將解決它我會認爲 – 2015-04-06 15:31:09

+0

你知道我可以檢查,看看lambda函數是否正確?當我做file.map(line => {...})。collect時,我所看到的只是Array [Unit] = Array((),(),...())。接下來我要做的就是用同一個鍵減少所有的值。但是,自動完成(點擊標籤)表明reduceByKey不是org.apache.spark.rdd.RDD [Unit]的成員。我仍然陷入了MapReduce的狀態。 – 2015-04-06 16:17:56

3

使用RDD.flatMapyieldfor循環列表:

val file = sc.textFile("/path/to/test.csv") 
file.flatMap { line => 
    val arr = line.split(",") 
    for { 
    i <- 0 until arr.length 
    j <- (i + 1) until arr.length 
    } yield { 
    val x = arr(i).toDouble 
    val y = arr(j).toDouble 
    val k = new Index(i, j) 
    val v = new Val(x, y) 
    (k, v) 
    } 
}.collect 
+0

Scala for for循環是神奇的。我從來沒有找到他們的文件,在這一點上,我害怕問。 – 2015-04-06 18:49:09

相關問題