0

我有第二個問題圍繞Spark 2.1中的CosineSimilarity/ColumnSimilarities。我對scala和所有的Spark環境都很陌生,並且這對我來說並不是很清楚:columnSimilarities()返回Spark數據框

如何從spark中的rowMatrix中爲列的每個組合獲取ColumnSimilarities。這裏是我的嘗試:

數據:

import org.apache.spark.sql.{SQLContext, Row, DataFrame} 
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType} 
import org.apache.spark.sql.functions._ 

// rdd 
    val rowsRdd: RDD[Row] = sc.parallelize(
     Seq(
     Row(2.0, 7.0, 1.0), 
     Row(3.5, 2.5, 0.0), 
     Row(7.0, 5.9, 0.0) 
    ) 
    ) 

// Schema 
    val schema = new StructType() 
     .add(StructField("item_1", DoubleType, true)) 
     .add(StructField("item_2", DoubleType, true)) 
     .add(StructField("item_3", DoubleType, true)) 

// Data frame 
    val df = spark.createDataFrame(rowsRdd, schema) 

代碼:

import org.apache.spark.ml.feature.VectorAssembler 
import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, CoordinateMatrix, RowMatrix} 

val rows = new VectorAssembler().setInputCols(df.columns).setOutputCol("vs") 
    .transform(df) 
    .select("vs") 
    .rdd 

val items_mllib_vector = rows.map(_.getAs[org.apache.spark.ml.linalg.Vector](0)) 
          .map(org.apache.spark.mllib.linalg.Vectors.fromML) 
val mat = new RowMatrix(items_mllib_vector) 
val simsPerfect = mat.columnSimilarities() 


println("Pairwise similarities are: " + simsPerfect.entries.collect.mkString(", ")) 

輸出:

Pairwise similarities are: MatrixEntry(0,2,0.24759378423606918), MatrixEntry(1,2,0.7376189553526812), MatrixEntry(0,1,0.8355316482961213) 

所以什麼時I g et is simsPerfect org.apache.spark.mllib.linalg.distributed.CoordinateMatrix我的專欄和相似之處。我將如何將其轉換回數據框並獲得正確的列名稱?

我首選的輸出:

item_from | item_to | similarity 
      1 |  2 |  0.83 |    
      1 |  3 |  0.24 | 
      2 |  3 |  0.73 | 

在此先感謝

回答

0

我發現我的問題的解決方案:

//Transform result to rdd 
val transformedRDD = simsPerfect.entries.map{case MatrixEntry(row: Long, col:Long, sim:Double) => Array(row,col,sim).mkString(",")} 

//Transform rdd[String] to rdd[Row] 
val rdd2 = transformedRDD.map(a => Row(a)) 

// to DF 
val dfschema = StructType(Array(StructField("value",StringType))) 
val rddToDF = spark.createDataFrame(rdd2,dfschema) 

//create new DF with schema 
val newdf = rddToDF.select(expr("(split(value, ','))[0]").cast("string").as("item_from") 
       ,expr("(split(value, ','))[1]").cast("string").as("item_to") 
       ,expr("(split(value, ','))[2]").cast("string").as("sim")) 

我敢肯定有另一種更簡單的方式來做到這一點,但我很高興,它的工作原理。

1

這種方法也適用,而不該行轉換爲字符串:

val transformedRDD = simsPerfect.entries.map{case MatrixEntry(row: Long, col:Long, sim:Double) => (row,col,sim)} 
val dff = sqlContext.createDataFrame(transformedRDD).toDF("item_from", "item_to", "sim") 

哪裏,我認爲val sqlContext = new org.apache.spark.sql.SQLContext(sc)已經定義並sc是SparkContext。