2015-05-25 96 views
6

很新的火花和Scala語言,並希望工會爲以下列表中的所有RDDS(List<RDD> to RDD):星火:我如何工會列表<RDD>到RDD

val data = for (item <- paths) yield { 
     val ad_data_path = item._1 
     val ad_data = SparkCommon.sc.textFile(ad_data_path).map { 
      line => { 
       val ad_data = new AdData(line) 
       (ad_data.ad_id, ad_data) 
      } 
     }.distinct() 
    } 
val ret = SparkCommon.sc.parallelize(data).reduce(_ ++ _) 

我運行代碼在IntelliJ中,雖然總是得到一個錯誤:

ava.lang.NullPointerException 
at org.apache.spark.rdd.RDD.<init>(RDD.scala:125) 
at org.apache.spark.rdd.UnionRDD.<init>(UnionRDD.scala:59) 
at org.apache.spark.rdd.RDD.union(RDD.scala:438) 
at org.apache.spark.rdd.RDD.$plus$plus(RDD.scala:444) 
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99) 
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99) 
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:177) 
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:172) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) 
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) 
at org.apache.spark.InterruptibleIterator.reduceLeft(InterruptibleIterator.scala:28) 
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847) 
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845) 
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157) 
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

任何人有任何關於錯誤的想法嗎?感謝提前:)

回答

17

這可能是原因,

val listA = 1 to 10 
for(i <- listA; if i%2 == 0)yield {i} 

將返回向量(2,4,6),而

for(i <- listA; if i%2 == 0)yield {val c = i} 

將返回向量(( ),(),(),(),())

這就是你的情況。您正在初始化ad_data但未返回成功。

至於你的問題而言,即列表[RDD]到RDD

這裏是解決方案:

val listA = sc.parallelize(1 to 10) 
val listB = sc.parallelize(10 to 1 by -1) 

創造的2個RDDS

val listC = List(listA,listB) 
列表

轉換列表[RDD]到RDD

val listD = listC.reduce(_ union _) 

希望,這個回答你的問題。

+0

非常感謝,所述您的解決方案解決了問題。 – juffun

+0

@juffun,可以接受答案,如果解決方案爲你工作:) – Akash

+0

當然,已經接受。 – juffun

0

將RDD列表轉換爲RDD的另一種簡單方法。 SparkContext有兩個重載聯合方法,一種接受兩個RDDS等接受RDDS列表

聯盟(第一,靜止) 聯盟(RDDS:序號[RDD [T]]))