也許這只是因爲我對API相對來說比較陌生,但我覺得Spark ML方法通常會返回不必要的難以處理的DF。如何壓扁結構數組的列(由Spark ML API返回)?
這一次,這是讓我絆倒的ALS模型。具體來說,就是recommendForAllUsers方法。讓我們重建DF的類型,它將返回:
scala> val arrayType = ArrayType(new StructType().add("itemId", IntegerType).add("rating", FloatType))
scala> val recs = Seq((1, Array((1, .7), (2, .5))), (2, Array((0, .9), (4, .1)))).
toDF("userId", "recommendations").
select($"userId", $"recommendations".cast(arrayType))
scala> recs.show()
+------+------------------+
|userId| recommendations|
+------+------------------+
| 1|[[1,0.7], [2,0.5]]|
| 2|[[0,0.9], [4,0.1]]|
+------+------------------+
scala> recs.printSchema
root
|-- userId: integer (nullable = false)
|-- recommendations: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- itemId: integer (nullable = true)
| | |-- rating: float (nullable = true)
現在,我只關心在recommendations
列itemId
。畢竟,方法是recommendForAllUsers
不是recommendAndScoreForAllUsers
(好吧好吧我會停止時髦...)
我該怎麼做?
我原本以爲它,當我創建了一個UDF:
scala> val itemIds = udf((arr: Array[(Int, Float)]) => arr.map(_._1))
但產生一個錯誤:
scala> recs.withColumn("items", items($"recommendations"))
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(recommendations)' due to data type mismatch: argument 1 requires array<struct<_1:int,_2:float>> type, however, '`recommendations`' is of array<struct<itemId:int,rating:float>> type.;;
'Project [userId#87, recommendations#92, UDF(recommendations#92) AS items#238]
+- Project [userId#87, cast(recommendations#88 as array<struct<itemId:int,rating:float>>) AS recommendations#92]
+- Project [_1#84 AS userId#87, _2#85 AS recommendations#88]
+- LocalRelation [_1#84, _2#85]
任何想法?謝謝!