2015-04-20 84 views
1

我在密鑰,值結構(someKey,(measure1,measure2))中有RDD。我按鍵分組,現在我想彙總每個鍵的值。每個密鑰的總RDD值

val RDD1 : RDD[(String,(Int,Int))] 
RDD1.groupByKey() 

結果我需要的是:

key: avg(measure1), avg(measure2), max(measure1), max(measure2), min(measure1), min(measure2), count(*) 

回答

5

首先,avoid groupByKey!您應該使用aggregateByKeycombineByKey。我們將使用aggregateByKey。該功能將轉換每個鍵的值:RDD[(K, V)] => RDD[(K, U)]。它需要零值U類型和知識如何合併(V, U) => U(U, U) => U。我簡化您的例子一點點,希望得到:key: avg(measure1), avg(measure2), min(measure1), min(measure2), count(*)

val rdd1 = sc.parallelize(List(("a", (11, 1)), ("a",(12, 3)), ("b",(10, 1)))) 
    rdd1 
    .aggregateByKey((0.0, 0.0, Int.MaxValue, Int.MaxValue, 0))(
     { 
     case ((sum1, sum2, min1, min2, count1), (v1, v2)) => 
      (sum1 + v1, sum2 + v2, v1 min min1, v2 min min2, count1+1) 
     }, 
     { 
     case ((sum1, sum2, min1, min2, count), 
      (otherSum1, otherSum2, otherMin1, otherMin2, otherCount)) => 
      (sum1 + otherSum1, sum2 + otherSum2, 
      min1 min otherMin1, min2 min otherMin2, count + otherCount) 
     } 
    ) 
    .map { 
     case (k, (sum1, sum2, min1, min2, count1)) => (k, (sum1/count1, sum2/count1, min1, min2, count1)) 
    } 
    .collect() 

(a,(11.5,2.0,11,1,2)), (b,(10.0,1.0,10,1,1)) 
+0

你能告訴我怎麼做,在Java的? –