通常當你想要結束一個地圖時,你需要將RDD中的記錄轉換爲鍵值對,並使用reduceByKey
。
您的具體示例聽起來與着名的wordcount示例完全相同(請參見第一個示例here),只需要從一個對象內的數組中計數整數,而不是從句子(字符串)中計數單詞。在Scala中,這將轉化:
import org.apache.spark.rdd.RDD
import scala.collection.Map
class Example {
case class MyObj(ints: Array[Int], otherStuff: String)
def countInts(input: RDD[MyObj]): Map[Int, Int] = {
input
.flatMap(_.ints) // flatMap maps each record into several records - in this case, each int becomes a record
.map(i => (i, 1)) // turn into key-value map, with preliminary value 1 for each key
.reduceByKey(_ + _) // aggregate values by key
.collectAsMap() // collects data into a Map
}
}
一般來說,你應該讓Spark在一個分佈式的方式執行儘可能多的操作可能,並儘可能推遲收集到內存中可能 - 如果你收集的值在減少之前,通常你會用完內存,除非你的數據集足夠小以至於不能開始(在這種情況下,你並不需要Spark)。
編輯:這裏是一個Java(更長的時間,但相同...)相同的代碼:
static class MyObj implements Serializable {
Integer[] ints;
String otherStuff;
}
Map<Integer, Integer> countInts(JavaRDD<MyObj> input) {
return input
.flatMap(new FlatMapFunction<MyObj, Integer>() {
@Override
public Iterable<Integer> call(MyObj myObj) throws Exception {
return Arrays.asList(myObj.ints);
}
}) // flatMap maps each record into several records - in this case, each int becomes a record
.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<>(integer, 1);
}
}) // turn into key-value map, with preliminary value 1
.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}) // aggregate values by key
.collectAsMap(); // collects data into a Map
}
我最終使用的聚合操作。這正是我所期待的。我將提供另一個答案。 – Tomy