我正在使用Scala中的Apache Spark。Spark關閉參數綁定
我試圖用第二個RDD的數據操作一個RDD時出現問題。我試圖將第二個RDD作爲參數傳遞給第一個RDD的「映射」函數,但看起來在該函數上創建的閉包綁定了該值的未初始化版本。
以下是一段簡單的代碼,顯示我看到的問題的類型。 (我第一次遇到麻煩的真實例子是更大,更不容易理解)。
我不太瞭解Spark閉包的參數綁定規則。
我真正需要的是如何使用另一個RDD的內容(之前在別處構建)來操作一個RDD的基本方法或模式。
在下面的代碼,調用Test1.process(SC)將失敗,在findSquare一個空指針訪問(如在閉合結合的第二ARG未被初始化)
object Test1 {
def process(sc: SparkContext) {
val squaresMap = (1 to 10).map(n => (n, n * n))
val squaresRDD = sc.parallelize(squaresMap)
val primes = sc.parallelize(List(2, 3, 5, 7))
for (p <- primes) {
println("%d: %d".format(p, findSquare(p, squaresRDD)))
}
}
def findSquare(n: Int, squaresRDD: RDD[(Int, Int)]): Int = {
squaresRDD.filter(kv => kv._1 == n).first._1
}
}
好的,謝謝。我對Spark很新,並且還沒有遇到你提到的規則(或者如果我在某處看到它的話,它沒有打到家)。我很自然地想到在處理另一個數據集時「諮詢」一個數據集,但我顯然需要調整我的想法。 –
感謝您的補充信息。我只有2周的時間學習Spark,所以我仍然試圖讓基本概念內化。廣播變量可能正是我認爲我想將一個arg傳遞給映射函數的一些情況。 –
根據數據的大小,您應該按照以下順序傳遞它:參數
zero323