2015-04-14 97 views
2

我是新來的火花,我非常喜歡這種技術提供的可能性。我的問題是如何在不使用for循環的情況下爲每個元素使用一個元素執行RDD其餘部分的操作。 這裏是我試圖用一個for循環:對RDD的其餘部分執行一個元素的操作

//RDD[Key:Int,Vector:(Double,Double)] 
val rdd = data.map(x => (x.split(',')(0).toInt,Vectors.dense(x.split(',')(1).toDouble,x.split(',')(2).toDouble))) 

for(ind <- 0 to rdd.count().toInt -1) { 
    val element1 = rdd.filter(x => x._1 == ind) 
    val vector1 = element1.first()._2 
    val rdd2 = rdd.map(x => { 
     var dist1 = Vectors.sqdist(x._2,vector1)  
     (x._1 , Math.sqrt(dist1)) 
     }) 
} 

謝謝您的建議

回答

1

如果你正在尋找發現所有向量之間的距離,使用rdd.cartesian

import org.apache.spark.mllib.linalg.Vectors 

val rdd = sc.parallelize(Array("0,1.0,1.0","1,2.0,2.0","2,3.0,3.0")) 
val r = rdd.map(x => x.split(",")) 
      .map(y =>(y(0).toInt, Vectors.dense(y(1).toDouble, y(2).toDouble))) 

val res = r.cartesian(r).map{ case (first, second) => 
    ((first._1, second._1), 
    Math.sqrt(Vectors.sqdist(first._2, second._2))) 
} 

但是它計算,兩次相同向量之間的距離。 (第一(A,B),然後(B,A))

相關問題