2015-11-02 100 views
14

我有一個名爲RDD初始化RDD清空

JavaPairRDD<String, List<String>> existingRDD; 

現在我需要初始化這個existingRDD爲空,這樣,當我得到的實際RDD的我可以做一個工會與此existingRDD。 如何將existingRDD初始化爲空RDD,但將其初始化爲空? 這裏是我的代碼:

JavaPairRDD<String, List<String>> existingRDD; 
if(ai.get()%10==0) 
{ 
    existingRDD.saveAsNewAPIHadoopFile("s3://manthan-impala-test/kinesis-dump/" + startTime + "/" + k + "/" + System.currentTimeMillis() + "/", 
    NullWritable.class, Text.class, TextOutputFormat.class); //on worker failure this will get overwritten         
} 
else 
{ 
    existingRDD.union(rdd); 
} 
+0

我不知道我非常理解你正在嘗試做什麼。你想在一個空的RDD上創建一個聯合?做什麼的? – eliasah

+0

是的。當我通過eachRDD循環時,我的意思是foreachRDD在我的代碼中,我需要將rdd與這個現有的RDD結合起來,這樣最後我可以將這個現有的RDD保存到我的s3中。 –

+0

爲什麼不能簡單地將'existingRDD'設置爲'rdd'寫入s3之後的第一次迭代? –

回答

21

我仍然不知道你正在嘗試做的,但隨後你可以創建一個空的RDD:

// Get an RDD that has no partitions or elements. 
JavaRDD<T> emptyRDD = sc.emptyRDD() 

我相信你知道怎麼使用泛型,否則,那就是:

JavaRDD<Tuple2<String,List<String>>> emptyRDD = sc.emptyRDD(); 
JavaPairRDD<String,List<String>> emptyPairRDD = JavaPairRDD.fromJavaRDD(
    existingRDD 
); 

您也可以使用mapToPair方法來轉換JavaRDDJavaPairRDD

解決辦法:

scala> val emptyRDD = sc.emptyRDD 
// emptyRDD: org.apache.spark.rdd.EmptyRDD[Nothing] = EmptyRDD[1] at ... 
0

在Scala中,我使用的 「並行化」 命令。

val emptyRDD = sc.parallelize(Seq("")) 
+1

我很確定,用一個條目(空字符串)創建了一個RDD [String]。 –

0

@eliasah答案非常有用,我提供代碼來創建空對RDD。考慮一個需要創建空對RDD(密鑰,值)的場景。以下scala代碼說明了如何使用鍵作爲字符串並將值作爲Int創建空對RDD。

resultRDD: org.apache.spark.rdd.EmptyRDD[(String, Int)] = EmptyRDD[0] at emptyRDD at <console>:29 
0

在Java中,創建空RDD有點複雜:

type pairRDD = (String,Int) 
var resultRDD = sparkContext.emptyRDD[pairRDD] 

如下RDD將被創建。我試過使用scala.reflect.classTag,但它也不起作用。經過多次測試後,工作的代碼更加簡單。

private JavaRDD<Foo> getEmptyJavaRdd() { 

/* this code does not compile because require <T> as parameter into emptyRDD */ 
//  JavaRDD<Foo> emptyRDD = sparkContext.emptyRDD(); 
//  return emptyRDD; 

/* this should be the solution that try to emulate the scala <T> */ 
/* but i could not make it work too */ 
//  ClassTag<Foo> tag = scala.reflect.ClassTag$.MODULE$.apply(Foo.class); 
//  return sparkContext.emptyRDD(tag); 

/* this alternative worked into java 8 */ 
    return SparkContext.parallelize(
      java.util.Arrays.asList() 
    ); 

}