2016-02-18 33 views
6

我想創建一個RDD來收集迭代計算的結果。創建一個RDD來收集迭代計算的結果

如何使用一個循環(或任何替代)來代替下面的代碼:

import org.apache.spark.mllib.random.RandomRDDs._  

val n = 10 

val step1 = normalRDD(sc, n, seed = 1) 
val step2 = normalRDD(sc, n, seed = (step1.max).toLong) 
val result1 = step1.zip(step2) 
val step3 = normalRDD(sc, n, seed = (step2.max).toLong) 
val result2 = result1.zip(step3) 

... 

val step50 = normalRDD(sc, n, seed = (step49.max).toLong) 
val result49 = result48.zip(step50) 

(創建N個步驟RDDS並在結束在一起,然後壓縮和解也將是確定只要50個RDDS創建迭代地尊重種子=(步驟(N-1)的.max)條件)

+0

我最好使用'Stream.unfold'從scalaz以產生步驟流,然後壓縮它與本身和/或scanRight .. – Reactormonk

回答

6

遞歸函數將工作:

/** 
* The return type is an Option to handle the case of a user specifying 
* a non positive number of steps. 
*/ 
def createZippedNormal(sc : SparkContext, 
         numPartitions : Int, 
         numSteps : Int) : Option[RDD[Double]] = { 

    @scala.annotation.tailrec 
    def accum(sc : SparkContext, 
      numPartitions : Int, 
      numSteps : Int, 
      currRDD : RDD[Double], 
      seed : Long) : RDD[Double] = { 
    if(numSteps <= 0) currRDD 
    else { 
     val newRDD = normalRDD(sc, numPartitions, seed) 
     accum(sc, numPartitions, numSteps - 1, currRDD.zip(newRDD), newRDD.max) 
    } 
    } 

    if(numSteps <= 0) None 
    else Some(accum(sc, numPartitions, numSteps, sc.emptyRDD[Double], 1L)) 
} 
+0

尾遞歸不會保護你從RDD血統吹堆棧:) – zero323

+0

@ zero323同意。但是,這個問題與問題的要求是固有的。任何答案都會遇到類似的問題。 –

+0

只是想指出你正在構建一個不會被尾部優化的幕後的遞歸數據結構。沒有更多:)實際上你可以解決它並通過使用檢查點來避免這個問題。這是甚至可以解決沒有一個zip :) – zero323