1
我使用Spark MLLib k-Means,它要求功能具有相同的尺寸。這些特徵是使用直方圖計算的,所以我必須使用固定大小的箱子。 Hive具有內置函數histogram_numeric(col,b) - 使用b個非均勻間隔的bin計算組中數值列的直方圖。什麼是最好的方法以及如何在直方圖中使用b個固定大小的容器?如何在Hive中的直方圖中使用固定大小的大小?
我使用Spark MLLib k-Means,它要求功能具有相同的尺寸。這些特徵是使用直方圖計算的,所以我必須使用固定大小的箱子。 Hive具有內置函數histogram_numeric(col,b) - 使用b個非均勻間隔的bin計算組中數值列的直方圖。什麼是最好的方法以及如何在直方圖中使用b個固定大小的容器?如何在Hive中的直方圖中使用固定大小的大小?
一個處理這種可能的方式是創建一個UDF這樣
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
def get_bucket(breaks: Array[Double]) = udf(
(x: Double) =>
scala.math.abs(java.util.Arrays.binarySearch(breaks, x) + 1))
讓我們假設你的數據看起來與此類似:
val df = sc.parallelize(Seq(
(1, 1.0), (1, 2.3), (1, 0.4), (1, 2.1), (1, 3.5), (1, 9.0),
(2, 3.6), (2, 0.2), (2, 0.6), (2, 0.1), (2, 4.0), (2, -1.0)
)).toDF("k", "v")
其中k
標識點和v
是你的值想用來計算直方圖。
val breaks = Array(0.0, 1.0, 2.0, 3.0, 4.0)
val dfWithBuckets = df
.withColumn("bucket", get_bucket(breaks)($"v"))
.groupBy($"k", $"bucket")
.agg(count(lit(1)))
dfWithBuckets.show()
// +---+------+--------+
// | k|bucket|count(1)|
// +---+------+--------+
// | 1| 1| 1|
// | 1| 2| 1|
// | 1| 3| 2|
// | 1| 4| 1|
// | 1| 5| 1|
// | 2| 0| 1|
// | 2| 1| 3|
// | 2| 4| 1|
// | 2| 5| 1|
// +---+------+--------+
最後上面的數據可以被收集,歸納,並轉化爲向量:
import org.apache.spark.mllib.linalg.Vectors
def toVector(xs: Iterable[(Int, Long)], n: Int) = {
val sorted = xs.toArray.sorted
val indices = sorted.map(_._1)
val values = sorted.map(_._2.toDouble)
Vectors.sparse(n, indices, values)
}
val vectors = dfWithBuckets.map{
case Row(k: Int, b: Int, cnt: Long) =>
(k, (b, cnt))}
.groupByKey
.mapValues(vs => toVector(vs, breaks.size + 1))
vectors.collect
// Array[(Int, org.apache.spark.mllib.linalg.Vector)] = Array(
// (1,(6,[1,2,3,4,5],[1.0,1.0,2.0,1.0,1.0])),
// (2,(6,[0,1,4,5],[1.0,3.0,1.0,1.0])))
感謝您的有益的建議!如果沒有其他選擇,我會使用你的方法。在我的情況下,我有:'myDfWithBuckets.groupBy($「bucket」)。agg(callUDF(「histogram_numeric」,$「col1」,lit(n)))''。如果n是固定大小的容器,那麼我就完成了。 histogram_numeric的問題是,在n = 40的情況下,有時我只回收12個桶,有時20個桶。我需要固定數量的桶。 – wdz