2015-11-18 41 views
1

我使用Spark MLLib k-Means,它要求功能具有相同的尺寸。這些特徵是使用直方圖計算的,所以我必須使用固定大小的箱子。 Hive具有內置函數histogram_numeric(col,b) - 使用b個非均勻間隔的bin計算組中數值列的直方圖。什麼是最好的方法以及如何在直方圖中使用b個固定大小的容器?如何在Hive中的直方圖中使用固定大小的大小?

回答

1

一個處理這種可能的方式是創建一個UDF這樣

import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions._ 

def get_bucket(breaks: Array[Double]) = udf(
    (x: Double) => 
    scala.math.abs(java.util.Arrays.binarySearch(breaks, x) + 1)) 

讓我們假設你的數據看起來與此類似:

val df = sc.parallelize(Seq(
    (1, 1.0), (1, 2.3), (1, 0.4), (1, 2.1), (1, 3.5), (1, 9.0), 
    (2, 3.6), (2, 0.2), (2, 0.6), (2, 0.1), (2, 4.0), (2, -1.0) 
)).toDF("k", "v") 

其中k標識點和v是你的值想用來計算直方圖。

val breaks = Array(0.0, 1.0, 2.0, 3.0, 4.0) 

val dfWithBuckets = df 
    .withColumn("bucket", get_bucket(breaks)($"v")) 
    .groupBy($"k", $"bucket") 
    .agg(count(lit(1))) 

dfWithBuckets.show() 
// +---+------+--------+ 
// | k|bucket|count(1)| 
// +---+------+--------+ 
// | 1|  1|  1| 
// | 1|  2|  1| 
// | 1|  3|  2| 
// | 1|  4|  1| 
// | 1|  5|  1| 
// | 2|  0|  1| 
// | 2|  1|  3| 
// | 2|  4|  1| 
// | 2|  5|  1| 
// +---+------+--------+ 

最後上面的數據可以被收集,歸納,並轉化爲向量:

import org.apache.spark.mllib.linalg.Vectors 

def toVector(xs: Iterable[(Int, Long)], n: Int) = { 
    val sorted = xs.toArray.sorted 
    val indices = sorted.map(_._1) 
    val values = sorted.map(_._2.toDouble) 
    Vectors.sparse(n, indices, values) 
} 

val vectors = dfWithBuckets.map{ 
    case Row(k: Int, b: Int, cnt: Long) => 
    (k, (b, cnt))} 
    .groupByKey 
    .mapValues(vs => toVector(vs, breaks.size + 1)) 

vectors.collect 
// Array[(Int, org.apache.spark.mllib.linalg.Vector)] = Array(
// (1,(6,[1,2,3,4,5],[1.0,1.0,2.0,1.0,1.0])), 
// (2,(6,[0,1,4,5],[1.0,3.0,1.0,1.0]))) 
+0

感謝您的有益的建議!如果沒有其他選擇,我會使用你的方法。在我的情況下,我有:'myDfWithBuckets.groupBy($「bucket」)。agg(callUDF(「histogram_numeric」,$「col1」,lit(n)))''。如果n是固定大小的容器,那麼我就完成了。 histogram_numeric的問題是,在n = 40的情況下,有時我只回收12個桶,有時20個桶。我需要固定數量的桶。 – wdz