2017-03-09 33 views
1

添加矢量我有兩個RDDS具有這種結構存在於兩個不同的RDDS階火花

org.apache.spark.rdd.RDD[(Long, org.apache.spark.mllib.linalg.Vector)] 

這裏RDD的每一行包含一個索引Long和向量​​org.apache.spark.mllib.linalg.Vector。 我想將Vector的每個組件添加到存在於其他RDD行中的其他Vector的對應組件中。第一個RDD的每個矢量應該被添加到其他RDD的每個矢量。

一個例子是這樣的:

RDD1集:

Array[(Long, org.apache.spark.mllib.linalg.Vector)] = 
     Array((0,[0.1,0.2]),(1,[0.3,0.4])) 

RDD2:

Array[(Long, org.apache.spark.mllib.linalg.Vector)] = 
     Array((0,[0.3,0.8]),(1,[0.2,0.7])) 

結果:

Array[(Long, org.apache.spark.mllib.linalg.Vector)] = 
Array((0,[0.4,1.0]),(0,[0.3,0.9]),(1,[0.6,1.2]),(1,[0.5,1.1])) 

回答

0

請使用列表,而不是考慮同樣的情況的Ar射線。

這裏是我的解決方案:

val l1 = List((0,List(0.1,0.2)),(1,List(0.1,0.2))) 
    val l2 = List((0,List(0.3,0.8)),(1,List(0.2,0.7))) 
    var sms = (l1 zip l2).map{ case (m, a) => (m._1, (m._2, a._2).zipped.map(_+_))} 

讓我們的實驗陣列:)

0

,而不是驅動程序代碼,你可以做到這一切的轉變。如果你有大的rdds,這將會很有幫助。這也會執行更少的洗牌。

val a:RDD[(Long, org.apache.spark.mllib.linalg.Vector)]= sc.parallelize(Array((0l,Vectors.dense(0.1,0.2)),(1l,Vectors.dense(0.3,0.4)))) 

val b:RDD[(Long, org.apache.spark.mllib.linalg.Vector)]= sc.parallelize(Array((0l,Vectors.dense(0.3,0.8)),(1l,Vectors.dense(0.2,0.7)))) 

val ab= a join b 

val result=ab.map(x => (x._1,Vectors.dense(x._2._1.apply(0)+x._2._2.apply(0),x._2._1.apply(1)+x._2._2.apply(1)))) 
+0

結果不正確,我想將第一個RDD中的每個向量添加到其他RDD的每個向量中,如示例中所述。 –

+0

讓我看看你的進步..我們不是在這裏代表你寫代碼..顯示你的進度加上你的問題你卡在哪裏然後我們會幫你 –

+0

非常感謝你,我已經解決了這個問題。 –