2015-12-31 62 views
3

有一個矩陣,我想用矢量執行它的點積。以下是Scala代碼:如何使用Spark的RDD執行帶有矢量的矩陣點積產品

val matrix = sc.parallelize(List(
    (("v1","v1"),2),(("v1","v2"),4),(("v1","v3"),1),(("v2","v2"),5), 
    (("v2","v3"),1),(("v3","v3"),2))) 

val vector = sc.parallelize(List(("v1",4),("v2",1),("v3",5))) 

val dotproduct = matrix.flatMap{x => { 
    vector.flatMap { y => { 
    if(x._1._2 == y._1) Tuple2(x._1._1, x._2 * y._2) 
    }} 
}}.reduceByKey((_,_) => _+_) 

但是發生下列錯誤:

<console>:25: error: type mismatch; 
found : (String, Int) 
required: TraversableOnce[?] 
val dotproduct = matrix.flatMap{ x => { vector.flatMap { y => { if(x._1._2 == y._1) (x._1._1, x._2 * y._2) }}}}.reduceByKey((_,_) => _+_) 
                         ^

我不知道,如果在RDD嵌套操作就可以了。 Spark MLlib是否提供任何API來執行矩陣和向量之間的點積?

+0

矩陣是對稱的,所以只有我使用對角線上方的元素。 – victorming888

回答

2

假設由點積你的意思只是普通的矩陣向量乘法,您可以使用multiply方法從mllib.linalg

val mlMat=Matrices.dense(3,2,matrix.collect().map(_._2.toDouble)).transpose 
val mlVect=Vectors.dense(vector.collect().map(_._2.toDouble)) 
mlMat.multiply(mlVect) 
//org.apache.spark.mllib.linalg.DenseVector = [17.0,31.0] 
3

我不知道,如果在嵌套操作RDD是好的。

這是不行的。 Spark不支持嵌套操作,轉換或分佈式數據結構。

Spark MLlib是否提供任何API來執行矩陣和向量之間的點積?

RowMatrix規定multiply接受局部矩陣的方法。它應該適用於你的情況。

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} 

val idx = "^v([0-9]+)$".r 

val rdd = sc.parallelize(List(
    (("v1", "v1"), 2), (("v1", "v2"), 4), 
    (("v1", "v3"), 1), (("v2", "v2"), 5), 
    (("v2", "v3"), 1), (("v3", "v3"), 2) 
)) 

val mat = new CoordinateMatrix(rdd.map { case ((idx(i), idx(j)), v) => 
    MatrixEntry(i.toLong - 1, j.toLong - 1, v.toDouble) 
}).toIndexedRowMatrix 

val vector = Matrices.dense(3, 1, Array(4.0, 1.0, 5.0)) 
mat.multiply(vector).rows 

如果要在內存中處理較大的向量,則可以使用塊矩陣。見Matrix Multiplication in Apache Spark

關於你的代碼,你可以例如做這樣的事情:

matrix 
    .map{case ((i, j), v) => (j, (i, v))} 
    .join(vector) 
    .values 
    .map{case ((i, v1), v2) => (i, v1 * v2)} 
    .reduceByKey(_ + _) 

或與當地的「載體」(可選播出):

val vector = Map(("v1" -> 4), ("v2" -> 1), ("v3" -> 5)).withDefault(_ => 0) 

matrix.map{case ((i, j), v) => (i, v * vector(j))}.reduceByKey(_ + _) 
+0

謝謝您澄清,zero323! – victorming888