2

假設元組的RDD存在類似如下:Spark RDD:如何最有效地計算統計信息?

(key1, 1) 
(key3, 9) 
(key2, 3) 
(key1, 4) 
(key1, 5) 
(key3, 2) 
(key2, 7) 
... 

什麼是計算對應於每個關鍵統計數據的最有效的(和,理想情況下,分佈式)的方式? (此刻,我要找計算標準偏差/方差,尤其如此。)據我所知,我的選擇權相當於:

  1. 使用colStats function in MLLib此方法具有易於適應優勢如果認爲有必要進行其他統計計算,則可以在以後使用其他mllib.stat函數。然而,它運行在包含每列數據的RDD Vector上,據我瞭解,這種方法需要在單個節點上收集每個密鑰的全部值,這對於大型企業而言似乎並不理想數據集。 Spark Vector是否總是暗示Vector中的數據本地駐留在單個節點上?
  2. 執行groupByKey,然後stats可能洗牌重,as a result of the groupByKey operation
  3. 執行aggregateByKey,初始化新StatCounter,並使用StatCounter::merge的順序和組合功能:這是方法recommended by this StackOverflow answer,避免從選項2.然而groupByKey,我一直沒能找到好的文檔PySpark中的StatCounter

我喜歡選項1,因爲它使代碼更可擴展的,因爲它可以很容易地適應使用具有類似合同其他MLLib功能更復雜的計算,但如果Vector輸入固有地要求該數據集在本地收集那麼它會限制代碼可以有效運行的數據大小。在另外兩個之間,選項3 看起來效率更高,因爲它避免了groupByKey,但我希望確認是這種情況。

有沒有其他的選擇我沒有考慮過? (我目前使用Python + PySpark,但如果語言有差異,我也可以使用Java/Scala解決方案。)

+0

可能重複[在單個數據中發現帶有pyspark的最小/最大值](http://stackoverflow.com/questions/36559809/finding-min-max-with-pyspark-in-single-pass-over -數據) –

回答

2

您可以試試reduceByKey。這是很簡單的,如果我們只是想計算min()

rdd.reduceByKey(lambda x,y: min(x,y)).collect() 
#Out[84]: [('key3', 2.0), ('key2', 3.0), ('key1', 1.0)] 

要計算mean,您首先需要創建(value, 1)元組,我們使用的reduceByKey操作同時計算sumcount。最後我們把它們通過彼此在mean到達:

meanRDD = (rdd 
      .mapValues(lambda x: (x, 1)) 
      .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) 
      .mapValues(lambda x: x[0]/x[1])) 

meanRDD.collect() 
#Out[85]: [('key3', 5.5), ('key2', 5.0), ('key1', 3.3333333333333335)] 

對於variance,您可以用公式(sumOfSquares/count) - (sum/count)^2, 我們通過以下方式轉換:

varRDD = (rdd 
      .mapValues(lambda x: (1, x, x*x)) 
      .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2])) 
      .mapValues(lambda x: (x[2]/x[0] - (x[1]/x[0])**2))) 

varRDD.collect() 
#Out[106]: [('key3', 12.25), ('key2', 4.0), ('key1', 2.8888888888888875)] 

我使用的值類型double而不是int中的虛擬數據準確地說明計算的平均值和方差:

rdd = sc.parallelize([("key1", 1.0), 
         ("key3", 9.0), 
         ("key2", 3.0), 
         ("key1", 4.0), 
         ("key1", 5.0), 
         ("key3", 2.0), 
         ("key2", 7.0)])