2016-07-21 55 views
0

我正在嘗試使用mapGroups執行聚合,它返回SparseMatrix作爲其中一列,然後對列進行求和。如何在Spark數據集中創建一個TypedColumn並對其進行操作?

我爲映射行創建了一個case class模式以提供列名稱。矩陣列輸入org.apache.spark.mllib.linalg.Matrix。如果在執行彙總(select(sum("mycolumn"))之前未運行toDF,則會出現一個類型不匹配錯誤(required: org.apache.spark.sql.TypedColumn[MySchema,?])。如果我包括toDF,我會得到另一個類型不匹配錯誤:cannot resolve 'sum(mycolumn)' due to data type mismatch: function sum requires numeric types, not org.apache.spark.mllib.linalg.MatrixUDT。那麼,正確的做法是什麼?

回答

1

看起來你至少在這裏遇到兩個不同的問題。讓我們假設你有Dataset這樣的:使用o.a.s.sql.functions.col

ds.select(col("_1").as[String]) 
  • val ds = Seq(
        ("foo", Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))), 
        ("foo", Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))) 
    ).toDS 
    

    選擇TypedColumn:使用隱式轉換

    • $

      ds.select(col("_1").as[String]) 
      

    添加矩陣:

    • MLLib MatrixMatrixUDT不執行加法。這意味着你將無法sum功能或+
    • 減少你可以使用第三方的線性代數庫,但在此不星火SQL支持/星火數據集

    如果你真的想這樣做與Datsets你可以嘗試做這樣的事情:

    ds.groupByKey(_._1).mapGroups(
        (key, values) => { 
        val matrices = values.map(_._2.toArray) 
        val first = matrices.next 
        val sum = matrices.foldLeft(first)(
         (acc, m) => acc.zip(m).map { case (x, y) => x + y } 
        ) 
        (key, sum) 
    }) 
    

    ,並映射回矩陣但我個人只想轉換爲RDD並使用breeze

  • +0

    謝謝。請你可以建議一個解決方案的附加問題?這就是我現在卡住的地方。 – Emre

    +0

    矩陣密集或稀疏嗎?大小是多少? – zero323

    +0

    相當小而稀疏;小到足以放在節點上。 – Emre

    相關問題