2015-11-24 57 views
12

我有兩列的數據框類型IntVec類型Vectororg.apache.spark.mllib.linalg.Vector)。如何定義自定義聚合函數來合計一列向量?

數據框看起來像如下:

ID,Vec 
1,[0,0,5] 
1,[4,0,1] 
1,[1,2,1] 
2,[7,5,0] 
2,[3,3,4] 
3,[0,8,1] 
3,[0,0,1] 
3,[7,7,7] 
.... 

我願做一個groupBy($"ID")然後由矢量求和每個組內的行申請的集合。

上面的例子中所述的期望的輸出將是:

ID,SumOfVectors 
1,[5,2,7] 
2,[10,8,4] 
3,[7,15,9] 
... 

可用聚合功能將不起作用,例如df.groupBy($"ID").agg(sum($"Vec")將導致ClassCastException。

如何實現一個自定義的聚合函數,它允許我執行矢量或數組或任何其他自定義操作的總和?

+3

[如何在Spark SQL中定義和使用用戶定義的聚合函數?](http://stackoverflow.com/questions/32100973/how-can-i-define-and-use-a-user -defined-aggregate-function-in-spark-sql) –

回答

19

我個人不會打擾UDAFs。不僅僅是冗長而且不完全快速。相反,我會簡單地使用reduceByKey/foldByKey

import org.apache.spark.sql.Row 
import breeze.linalg.{DenseVector => BDV} 
import org.apache.spark.ml.linalg.{Vector, Vectors} 

val rdd = sc.parallelize(Seq(
    (1, "[0,0,5]"), (1, "[4,0,1]"), (1, "[1,2,1]"), 
    (2, "[7,5,0]"), (2, "[3,3,4]"), (3, "[0,8,1]"), 
    (3, "[0,0,1]"), (3, "[7,7,7]"))) 

val df = rdd.map{case (k, v) => (k, Vectors.parse(v))}.toDF("id", "vec") 

val aggregated = df 
    .rdd 
    .map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values)) } 
    .foldByKey(BDV(Array.fill(3)(0.0)))(_ += _) 
    .mapValues(v => Vectors.dense(v.toArray)) 
    .toDF("id", "vec") 

aggregated.show 

// +---+--------------+ 
// | id|   vec| 
// +---+--------------+ 
// | 1| [5.0,2.0,7.0]| 
// | 2|[10.0,8.0,4.0]| 
// | 3|[7.0,15.0,9.0]| 
// +---+--------------+ 

和公正的比較 「簡單」 的UDAF。所需進口:

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
    UserDefinedAggregateFunction} 
import org.apache.spark.ml.linalg.{Vector, Vectors, SQLDataTypes} 
import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType} 
import org.apache.spark.sql.Row 
import scala.collection.mutable.WrappedArray 

類定義:

class VectorSum (n: Int) extends UserDefinedAggregateFunction { 
    def inputSchema = new StructType().add("v", SQLDataTypes.VectorType) 
    def bufferSchema = new StructType().add("buff", ArrayType(DoubleType)) 
    def dataType = SQLDataTypes.VectorType 
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = { 
     buffer.update(0, Array.fill(n)(0.0)) 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) = { 
     if (!input.isNullAt(0)) { 
     val buff = buffer.getAs[WrappedArray[Double]](0) 
     val v = input.getAs[Vector](0).toSparse 
     for (i <- v.indices) { 
      buff(i) += v(i) 
     } 
     buffer.update(0, buff) 
     } 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
     val buff1 = buffer1.getAs[WrappedArray[Double]](0) 
     val buff2 = buffer2.getAs[WrappedArray[Double]](0) 
     for ((x, i) <- buff2.zipWithIndex) { 
     buff1(i) += x 
     } 
     buffer1.update(0, buff1) 
    } 

    def evaluate(buffer: Row) = Vectors.dense(
     buffer.getAs[Seq[Double]](0).toArray) 
} 

和示例用法:

df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show 

// +---+--------------+ 
// | id|   vec| 
// +---+--------------+ 
// | 1| [5.0,2.0,7.0]| 
// | 2|[10.0,8.0,4.0]| 
// | 3|[7.0,15.0,9.0]| 
// +---+--------------+ 

參見:How to find mean of grouped Vector columns in Spark SQL?

+0

我看到訣竅是使用breeze.linalg.DensVector,爲什麼它工作並且mllib.linalg的密集向量不是? – Rami

+1

問題是''mllib.linalg.Vector'的Scala版本沒有'+'方法。 – zero323

+0

這不能用DF或SQL完成嗎? – oluies

0

我建議如下(在星火2.0.2以後的作品),它可能會進行優化,但它是非常好的,你要提前知道一件事是矢量的大小,當您創建UDAF實例

import org.apache.spark.ml.linalg._ 
import org.apache.spark.mllib.linalg.WeightedSparseVector 
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} 
import org.apache.spark.sql.types._ 

class VectorAggregate(val numFeatures: Int) 
    extends UserDefinedAggregateFunction { 

private type B = Map[Int, Double] 

def inputSchema: StructType = StructType(StructField("vec", new VectorUDT()) :: Nil) 

def bufferSchema: StructType = 
StructType(StructField("agg", MapType(IntegerType, DoubleType)) :: Nil) 

def initialize(buffer: MutableAggregationBuffer): Unit = 
buffer.update(0, Map.empty[Int, Double]) 

def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 
    val zero = buffer.getAs[B](0) 
    input match { 
     case Row(DenseVector(values)) => buffer.update(0, values.zipWithIndex.foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) 
     case Row(SparseVector(_, indices, values)) => buffer.update(0, values.zip(indices).foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) }} 
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
val zero = buffer1.getAs[B](0) 
buffer1.update(0, buffer2.getAs[B](0).foldLeft(zero){case (acc,(i,v)) => acc.updated(i, v + acc.getOrElse(i,0d))})} 

def deterministic: Boolean = true 

def evaluate(buffer: Row): Any = { 
    val Row(agg: B) = buffer 
    val indices = agg.keys.toArray.sorted 
    Vectors.sparse(numFeatures,indices,indices.map(agg)).compressed 
} 

def dataType: DataType = new VectorUDT() 
}