2016-02-08 35 views
0

我已經使用映射步驟來創建包含我需要的一些對象的JavaRDD對象。基於這些對象,我想創建一個包含一些統計數據的全局散列表,但我無法弄清楚要使用哪個RDD操作。起初,我認爲減少將是解決方案,但後來我看到你必須返回相同類型的對象。我對減少物品不感興趣,但收集所有機器的所有統計數據(它們可以單獨計算,然後加上up_)。在apache-spark RDD操作(Java)中將數據添加到散列表中

例如: 我有一個包含整數數組的對象的RDD除此之外,我想計算每個整數出現在陣列中的次數,每個機器應該計算它自己的散列表,然後將它們全部放在驅動程序中的一個地方

回答

0

通常當你想要結束一個地圖時,你需要將RDD中的記錄轉換爲鍵值對,並使用reduceByKey

您的具體示例聽起來與着名的wordcount示例完全相同(請參見第一個示例here),只需要從一個對象內的數組中計數整數,而不是從句子(字符串)中計數單詞。在Scala中,這將轉化:

import org.apache.spark.rdd.RDD 
import scala.collection.Map 

class Example { 

    case class MyObj(ints: Array[Int], otherStuff: String) 

    def countInts(input: RDD[MyObj]): Map[Int, Int] = { 
    input 
     .flatMap(_.ints) // flatMap maps each record into several records - in this case, each int becomes a record 
     .map(i => (i, 1)) // turn into key-value map, with preliminary value 1 for each key 
     .reduceByKey(_ + _) // aggregate values by key 
     .collectAsMap()  // collects data into a Map 
    } 
} 

一般來說,你應該讓Spark在一個分佈式的方式執行儘可能多的操作可能,並儘可能推遲收集到內存中可能 - 如果你收集的值在減少之前,通常你會用完內存,除非你的數據集足夠小以至於不能開始(在這種情況下,你並不需要Spark)。

編輯:這裏是一個Java(更長的時間,但相同...)相同的代碼:

static class MyObj implements Serializable { 
     Integer[] ints; 
     String otherStuff; 
    } 

    Map<Integer, Integer> countInts(JavaRDD<MyObj> input) { 
     return input 
       .flatMap(new FlatMapFunction<MyObj, Integer>() { 
        @Override 
        public Iterable<Integer> call(MyObj myObj) throws Exception { 
         return Arrays.asList(myObj.ints); 
        } 
       }) // flatMap maps each record into several records - in this case, each int becomes a record 
       .mapToPair(new PairFunction<Integer, Integer, Integer>() { 
        @Override 
        public Tuple2<Integer, Integer> call(Integer integer) throws Exception { 
         return new Tuple2<>(integer, 1); 
        } 
       }) // turn into key-value map, with preliminary value 1 
       .reduceByKey(new Function2<Integer, Integer, Integer>() { 
        @Override 
        public Integer call(Integer v1, Integer v2) throws Exception { 
         return v1 + v2; 
        } 
       }) // aggregate values by key 
       .collectAsMap();  // collects data into a Map 
    } 
+0

我最終使用的聚合操作。這正是我所期待的。我將提供另一個答案。 – Tomy