2015-08-21 208 views
-1

我正在從HDFS加載文件到JavaRDD,並且想要更新那個RDD。爲此,我將其轉換爲IndexedRDDhttps://github.com/amplab/spark-indexedrdd),但我無法獲得Classcast Exception。 基本上我會做出關鍵值對並更新密鑰。 IndexedRDD支持更新。有什麼方法可以轉換嗎?Spark RDD更新

JavaPairRDD<String, String> mappedRDD = lines.flatMapToPair(new PairFlatMapFunction<String, String, String>() 
    { 
     @Override 
     public Iterable<Tuple2<String, String>> call(String arg0) throws Exception { 

      String[] arr = arg0.split(" ",2); 
      System.out.println("lenght" + arr.length); 
      List<Tuple2<String, String>> results = new ArrayList<Tuple2<String, String>>(); 
      results.addAll(results); 
      return results; 
     } 
    });   

    IndexedRDD<String,String> test = (IndexedRDD<String,String>) mappedRDD.collectAsMap(); 
+0

您正在使用哪個版本的spark? – eliasah

+0

\t \t \t org.apache.spark \t \t \t 火花core_2.10 \t \t \t 1.4.1 \t \t vsingh28

回答

0

collectAsMap()回報java.util.Map包含從JavaPairRDD所有條目,但沒有星火有關。我的意思是,該功能是收集一個節點中的值並使用普通Java。因此,您不能將它投射到IndexedRDD或任何其他RDD類型,因爲它只是普通的Map

我沒有用過IndexedRDD,但是從例子中可以看到,你需要傳遞給它的構造函數來創建它PairRDD

// Create an RDD of key-value pairs with Long keys. 
val rdd = sc.parallelize((1 to 1000000).map(x => (x.toLong, 0))) 
// Construct an IndexedRDD from the pairs, hash-partitioning and indexing 
// the entries. 
val indexed = IndexedRDD(rdd).cache() 

因此,在你的代碼應該是:

IndexedRDD<String,String> test = new IndexedRDD<String,String>(mappedRDD.rdd()); 
+0

試過這種的編譯時錯誤說 - 構造IndexedRDD (JavaPairRDD )未定義 – vsingh28

+0

嗯,試試'new IndexedRDD (mappedRDD.rdd());' – Balduz

+0

相同的編譯時錯誤構造函數沒有爲此定義,不工作 – vsingh28