2014-10-31 57 views
1

對於以下RDD在星火定製RDD(顯示相關部分):調用收集()上產生一個空的集合

val myRdd = new RDD[RddOutput](zippedRows) { 

    override def compute(split: Partition, context: TaskContext): Iterator[RddOutput] = { 
     .. 
     val out = // computes a list of items 
    } 
    out.toIterator // Breakpoint set here: out is non-empty 
    } 

} 

當調用RDD:

val outVects = myRdd.collect 
val veclen = outVects(0).size // outVects is null! 

因此,作爲評論音符:compute()中的輸出迭代器非空,但是沒有從collect()調用返回的數據。有任何想法嗎?

+0

您是否真的需要創建一個新的RDD? zippedRows.map/mapPartitions足夠了嗎? – zsxwing 2014-10-31 10:03:06

+0

@zsxwing廣播正在推動這一選擇。如果/當這個問題得到解決,自定義RDD將不是必需的。 – javadba 2014-10-31 13:50:33

+0

沒有我們可以運行的例子,很難說出任何事情。這適用於我:'new RDD [Int](sc.parallelize(1 to 10)){override def compute(split:Partition,context:TaskContext)= Seq(1,2).iterator;重寫def getPartitions = firstParent [Int] .partitions} .collect'。 – 2014-10-31 16:06:16

回答

0

此問題似乎與不一致的分區方案有關。 MyRdd被賦予與父項相同的分區。然後我添加了一個轉換步驟,但沒有更新分區以指向新的父節點:分區指向新的祖父節點RDD。