2016-12-01 65 views
1
for (fordate <- 2 to 30) { 
    val dataRDD = sc.textFile("s3n://mypath" + fordate + "/*") 
    val a = 1 
    val c = fordate - 1 
    for (b <- a to c) { 
    val cumilativeRDD1 = sc.textFile("s3n://mypath/" + b + "/*") 
    val cumilativeRDD : org.apache.spark.rdd.RDD[String] = sc.union(cumilativeRDD1, cumilativeRDD) 
    if (b == c) { 
     val incrementalDEviceIDs = dataRDD.subtract(cumilativeRDD) 
     val countofIDs = incrementalDEviceIDs.distinct().count() 
     println(s"201611 $fordate $countofIDs") 
    } 
    } 
} 

我有一個數據集,我每天都會得到deviceID。我需要弄清楚每天的增量計數,但是當我加入cumilativeRDD到它本身saysthrows以下錯誤:斯卡拉 - 將RDD附加到自身

forward reference extends over definition of value cumilativeRDD

我怎樣才能克服這一點。

+0

目前尚不清楚你想要達到的目標。你能描述這個過程的意圖嗎? – maasg

+0

因此,如果正確地明白的*意圖*是diff的'天[X]'與'總和[1,X-1](天[I])'。除了解決變量範圍的問題之外,這個實現對於來自網絡的數據是n^2。您閱讀「第1天」,「第2天」,「第n」次,...。我建議您檢查一下您的流程,以累計所看到的數據並進行比較。 – maasg

回答

1

的問題是這一行:

val cumilativeRDD : org.apache.spark.rdd.RDD[String] = sc.union(cumilativeRDD1 ,cumilativeRDD) 

它的聲明之前您正在使用cumilativeRDD。變量賦值從右到左工作。 =的右側定義了左側的變量。因此,您不能在自己的定義中使用該變量。因爲在等式的右邊,變量還不存在。

你必須初始化在第一次運行cumilativeRDD,然後你就可以用它在以下運行:

var cumilativeRDD: Option[org.apache.spark.rdd.RDD[String]] = None 
for (fordate <- 2 to 30) { 
    val DataRDD = sc.textFile("s3n://mypath" + fordate + "/*") 
    val c = fordate - 1 
    for (b <- 1 to c) { 
     val cumilativeRDD1 = sc.textFile("s3n://mypath/" + b + "/*") 
     if (cumilativeRDD.isEmpty) cumilativeRDD = Some(cumilativeRDD1) 
     else cumilativeRDD = Some(sc.union(cumilativeRDD1, cumilativeRDD.get)) 

     if (b == c) { 
     val IncrementalDEviceIDs = DataRDD.subtract(cumilativeRDD.get) 
     val countofIDs = IncrementalDEviceIDs.distinct().count() 
     println("201611" + fordate + " " + countofIDs) 
     } 
    } 
    } 
+0

即使這樣,它拋出了同樣的錯誤後:'參考向前延伸value' – toofrellik

+0

的定義,您能給一些方面?在哪條線上拋出這個錯誤? – gesundkrank

+0

_error:正向引用延伸超過定義值cumilativeRDD_ 'val cumilativeRDD:org.apache.spark.rdd.RDD [String] = sc.union(cumilativeRDD1,cumilativeRDD)' – toofrellik