1
對於以下RDD在星火定製RDD(顯示相關部分):調用收集()上產生一個空的集合
val myRdd = new RDD[RddOutput](zippedRows) {
override def compute(split: Partition, context: TaskContext): Iterator[RddOutput] = {
..
val out = // computes a list of items
}
out.toIterator // Breakpoint set here: out is non-empty
}
}
當調用RDD:
val outVects = myRdd.collect
val veclen = outVects(0).size // outVects is null!
因此,作爲評論音符:compute()中的輸出迭代器非空,但是沒有從collect()調用返回的數據。有任何想法嗎?
您是否真的需要創建一個新的RDD? zippedRows.map/mapPartitions足夠了嗎? – zsxwing 2014-10-31 10:03:06
@zsxwing廣播正在推動這一選擇。如果/當這個問題得到解決,自定義RDD將不是必需的。 – javadba 2014-10-31 13:50:33
沒有我們可以運行的例子,很難說出任何事情。這適用於我:'new RDD [Int](sc.parallelize(1 to 10)){override def compute(split:Partition,context:TaskContext)= Seq(1,2).iterator;重寫def getPartitions = firstParent [Int] .partitions} .collect'。 – 2014-10-31 16:06:16