2015-03-02 53 views

回答

1

將RDD轉換爲Double的RDD,然後使用.histogram(10)操作。見DoubleRDD ScalaDoc

+3

.histogram(bucketCount)發現了這個蜂巢UDAF不算百分點,這「計算的直方圖使用bucketCount桶的數量*在RDD的最小值和最大值之間均勻分佈「 – Dmitry 2016-04-04 21:11:39

19

您可以:

  1. 排序通過rdd.sortBy()
  2. 計算通過rdd.count的數據集的大小()
  3. 郵編索引,以方便檢索百分的
  4. 通過rdd.lookup()獲取想要的百分位數例如爲第10百分位rdd.lookup(0.1 *大小)

爲了計算中位數和第99百分位數: getPercentiles(RDD,新的雙[] {0.5,0.99},大小,numPartitions);

在Java 8:

public static double[] getPercentiles(JavaRDD<Double> rdd, double[] percentiles, long rddSize, int numPartitions) { 
    double[] values = new double[percentiles.length]; 

    JavaRDD<Double> sorted = rdd.sortBy((Double d) -> d, true, numPartitions); 
    JavaPairRDD<Long, Double> indexed = sorted.zipWithIndex().mapToPair((Tuple2<Double, Long> t) -> t.swap()); 

    for (int i = 0; i < percentiles.length; i++) { 
     double percentile = percentiles[i]; 
     long id = (long) (rddSize * percentile); 
     values[i] = indexed.lookup(id).get(0); 
    } 

    return values; 
} 

注意,這需要排序數據集,O(n.log(N)),並且可以是對大數據集昂貴。

另一個建議簡單計算直方圖的答案將無法正確計算百分比:這裏是一個反例:由100個數字組成的數據集,99個數字爲0,一個數字爲1。最終得到所有99 0在第一個垃圾箱中,最後一個垃圾箱中的1,中間有8個空垃圾箱。

0

另一種替代方法是使用double的RDD上的頂部和最後一個。例如,val percentile_99th_value = scores.top((count/100).toInt).last

此方法更適合個別百分位數。

3

我發現了這個要點

https://gist.github.com/felixcheung/92ae74bc349ea83a9e29

,包含以下功能:

/** 
    * compute percentile from an unsorted Spark RDD 
    * @param data: input data set of Long integers 
    * @param tile: percentile to compute (eg. 85 percentile) 
    * @return value of input data at the specified percentile 
    */ 
    def computePercentile(data: RDD[Long], tile: Double): Double = { 
    // NIST method; data to be sorted in ascending order 
    val r = data.sortBy(x => x) 
    val c = r.count() 
    if (c == 1) r.first() 
    else { 
     val n = (tile/100d) * (c + 1d) 
     val k = math.floor(n).toLong 
     val d = n - k 
     if (k <= 0) r.first() 
     else { 
     val index = r.zipWithIndex().map(_.swap) 
     val last = c 
     if (k >= c) { 
      index.lookup(last - 1).head 
     } else { 
      index.lookup(k - 1).head + d * (index.lookup(k).head - index.lookup(k - 1).head) 
     } 
     } 
    } 
    } 
3

這是我在星火Python實現用於計算包含感興趣的值的RDD百分。

def percentile_threshold(ardd, percentile): 
    assert percentile > 0 and percentile <= 100, "percentile should be larger then 0 and smaller or equal to 100" 

    return ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0])) \ 
      .lookup(np.ceil(ardd.count()/100 * percentile - 1))[0] 

# Now test it out 
import numpy as np 
randlist = range(1,10001) 
np.random.shuffle(randlist) 
ardd = sc.parallelize(randlist) 

print percentile_threshold(ardd,0.001) 
print percentile_threshold(ardd,1) 
print percentile_threshold(ardd,60.11) 
print percentile_threshold(ardd,99) 
print percentile_threshold(ardd,99.999) 
print percentile_threshold(ardd,100) 

# output: 
# 1 
# 100 
# 6011 
# 9900 
# 10000 
# 10000 

另外,我定義了以下函數以獲得第10到第100百分位數。

def get_percentiles(rdd, stepsize=10): 
    percentiles = [] 
    rddcount100 = rdd.count()/100 
    sortedrdd = ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0])) 


    for p in range(0, 101, stepsize): 
     if p == 0: 
      pass 
      # I am not aware of a formal definition of 0 percentile, 
      # you can put a place holder like this if you want 
      # percentiles.append(sortedrdd.lookup(0)[0] - 1) 
     elif p == 100: 
      percentiles.append(sortedrdd.lookup(np.ceil(rddcount100 * 100 - 1))[0]) 
     else: 
      pv = sortedrdd.lookup(np.ceil(rddcount100 * p) - 1)[0] 
      percentiles.append(pv) 

    return percentiles 

randlist = range(1,10001) 
np.random.shuffle(randlist) 
ardd = sc.parallelize(randlist) 
get_percentiles(ardd, 10) 

# [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000] 
+0

不應該在'get_percentiles'的'sortedrdd'定義中將'ardd'替換爲'rdd'?以及添加'導入numpy爲np'。 IOT似乎不適用於'numpy 1.11.3' – Jorge 2017-08-24 07:27:00

5

如何T-消化

https://github.com/tdunning/t-digest

準確線上累積基於秩統計的新的數據結構,如分位數和修剪裝置。 t-digest算法也非常平行,使其在地圖縮減和並行流應用程序中非常有用。

t消化構造算法使用一維k均值聚類的變體來生成與Q摘要相關的數據結構。這個t消化數據結構可用於估計分位數或計算其他等級統計。 t-digest優於Q-digest的優點在於t-digest可以處理浮點值,而Q-digest僅限於整數。只要稍作修改,t-digest就可以處理任何有序集合中的任何值,這些集合的含義與平均值相似。儘管t-digests存儲在磁盤上時更加緊湊,但由t-digests產生的分位數估計的準確性可能比Q-digest所產生的分位數精確得多。

綜上所述,T-消化的特別有趣的特點是,它

  • 比Q-消化對雙打以及整數
  • 作品小總結。
  • 提供每百萬精度極端位數和通常<爲中間1000ppm的準確性部分分位數
  • 是快速
  • 非常簡單
  • 具有具有> 90%的測試覆蓋率的參考實現
  • 可以是所使用的map-reduce很容易,因爲消化可以合併

它應該是相當容易使用的參考Java的implem來自Spark的誘惑。

+1

其實Erik Erlandson在這裏有一個火花實現:https://github.com/isarn/isarn-sketches-spark。它效果很好。我發現唯一的解決方案就是不能將TDigest對象保存爲parquet格式。只要你只是扔了大量的數據,並要求獲得一些百分點的結果,那就是你正在尋找的東西:) – 2018-01-16 19:56:46

2

如果您不介意將您的RDD轉換爲DataFrame並使用Hive UDAF,則可以使用percentile。假設你裝HiveContext hiveContext到範圍:

hiveContext.sql("SELECT percentile(x, array(0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9)) FROM yourDataFrame")

this answer.