2016-05-29 53 views
2

我有一個JavaRDD它看起來像這樣,如何在Java Spark RDD上執行標準偏差和平均值操作?

[ 
[A,8] 
[B,3] 
[C,5] 
[A,2] 
[B,8] 
... 
... 
] 

我希望我的結果是 平均

[ 
[A,5] 
[B,5.5] 
[C,5] 
] 

我如何做到這一點使用Java RDDS只。 P.S:我想避免groupBy操作,所以我沒有使用DataFrames。

+0

關於'groupBy'上'DataFrame'看到:http://stackoverflow.com/q/32902982/1560062。對於其餘部分:http://stackoverflow.com/q/35780331/1560062和http://stackoverflow.com/a/35407148/1560062 – zero323

回答

2

在這裏你去:

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.util.StatCounter; 
import scala.Tuple2; 
import scala.Tuple3; 

import java.util.Arrays; 
import java.util.List; 

public class AggregateByKeyStatCounter { 

    public static void main(String[] args) { 

    SparkConf conf = new SparkConf().setAppName("AggregateByKeyStatCounter").setMaster("local"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 

    List<Tuple2<String, Integer>> myList = Arrays.asList(new Tuple2<>("A", 8), new Tuple2<>("B", 3), new Tuple2<>("C", 5), 
      new Tuple2<>("A", 2), new Tuple2<>("B", 8)); 

    JavaRDD<Tuple2<String, Integer>> data = sc.parallelize(myList); 
    JavaPairRDD<String, Integer> pairs = JavaPairRDD.fromJavaRDD(data); 

    /* I'm actually using aggregateByKey to perform StatCounter 
     aggregation, so actually you can even have more statistics available */ 
    JavaRDD<Tuple3<String, Double, Double>> output = pairs 
         .aggregateByKey(
          new StatCounter(), 
          StatCounter::merge, 
          StatCounter::merge) 
         .map(x -> new Tuple3<String, Double, Double>(x._1(), x._2().stdev(), x._2().mean())); 

    output.collect().forEach(System.out::println); 
    } 

} 
0

您可以使用reduceByKey並計算每個鍵的總和和計數,然後按如下方式將它們分配給每個鍵。

val means: RDD[(String, Double)] = rdd 
.map(x => (x._1, (x._2, 1))) // add 1 for each element for the count 
.reduceByKey((a,b) => (a._1+b._1, a._2+b._2)) // create a tuple (count, sum) for each key 
.map{ case (k, v) => (k, v._1/v._2) } // calculate mean for each key 
相關問題