2015-10-17 51 views
4

我正在使用Scala中的Apache Spark。Spark關閉參數綁定

我試圖用第二個RDD的數據操作一個RDD時出現問題。我試圖將第二個RDD作爲參數傳遞給第一個RDD的「映射」函數,但看起來在該函數上創建的閉包綁定了該值的未初始​​化版本。

以下是一段簡單的代碼,顯示我看到的問題的類型。 (我第一次遇到麻煩的真實例子是更大,更不容易理解)。

我不太瞭解Spark閉包的參數綁定規則。

我真正需要的是如何使用另一個RDD的內容(之前在別處構建)來操作一個RDD的基本方法或模式。

在下面的代碼,調用Test1.process(SC)將失敗,在findSquare一個空指針訪問(如在閉合結合的第二ARG未被初始化)

object Test1 { 

    def process(sc: SparkContext) { 
    val squaresMap = (1 to 10).map(n => (n, n * n)) 
    val squaresRDD = sc.parallelize(squaresMap) 

    val primes = sc.parallelize(List(2, 3, 5, 7)) 

    for (p <- primes) { 
     println("%d: %d".format(p, findSquare(p, squaresRDD))) 
    } 
    } 

    def findSquare(n: Int, squaresRDD: RDD[(Int, Int)]): Int = { 
    squaresRDD.filter(kv => kv._1 == n).first._1 
    } 
} 

回答

4

問題,你的經驗無關,與閉合或RDDS其中,流行的看法相反,are serializable

它只是打破了一個基本的Spark規則,它規定你不能觸發一個動作或從另一個動作或轉換轉換*並且這個問題的不同變體已經被多次詢問。

要理解爲什麼這是你要想想該架構的情況:

  • SparkContext是對駕駛員管理
  • 裏面的一切變革是在工人執行的情況。每個工作人員只能訪問自己的部分數據,不能與其他工作人員進行交流。

如果你想使用你必須要使用的結合RDDS,像joincartesianzipunion變換的一個多RDDS的內容。

在這裏,您最有可能(我不知道爲什麼你傳遞數組和使用這個元組僅第一個元素)希望用廣播變量:

val squaresMapBD = sc.broadcast(squaresMap) 

def findSquare(n: Int): Seq[(Int, Int)] = { 
    squaresMapBD.value 
    .filter{case (k, v) => k == n} 
    .map{case (k, v) => (n, k)} 
    .take(1) 
} 

primes.flatMap(findSquare) 

或笛卡爾:

primes 
    .cartesian(squaresRDD) 
    .filter{case (n, (k, _)) => n == k}.map{case (n, (k, _)) => (n, k)} 

轉換primes接到虛對(Int, null)join會更有效:

primes.map((_, null)).join(squaresRDD).map(...) 

但根據您的意見我假設你有一個場景,當有自然連接條件。

根據上下文,還可以考慮使用數據庫或文件來存儲公用數據。

在旁註中RDD不可迭代,因此您不能簡單地使用for循環。爲了能夠做到這一點,你必須首先collect或將toLocalIterator轉換成。您也可以使用foreach方法。


*確切的說,您不能訪問SparkContext

** Torrent廣播和樹聚合涉及執行者之間的通信,所以它在技術上是可行的。

+1

好的,謝謝。我對Spark很新,並且還沒有遇到你提到的規則(或者如果我在某處看到它的話,它沒有打到家)。我很自然地想到在處理另一個數據集時「諮詢」一個數據集,但我顯然需要調整我的想法。 –

+1

感謝您的補充信息。我只有2周的時間學習Spark,所以我仍然試圖讓基本概念內化。廣播變量可能正是我認爲我想將一個arg傳遞給映射函數的一些情況。 –

+0

根據數據的大小,您應該按照以下順序傳遞它:參數 zero323

-3

RDD不能序列化,所以你不能在rdd轉換中使用rdd。 然後我從來沒有見過用for語句枚舉rdd,通常我使用的是foreach語句,它是rdd api的一部分。

爲了將數據從兩個RDD結合起來,你可以利用加入,工會或廣播(如果您的RDD小)

+1

我以前見過RDD可序列化的意見。我在下面的帖子中找到了相當恰當的答案。對於如何處理一個數據集與另一個數據集操作的一般情況(他們是根本不同類型的數據並且不自然地「加入」等),我仍然在腦海中留下了一個困惑。 stackoverflow.com/questions/29567247/serializing-rdd –

+0

我恰巧正在使用舊版本的Spark(1.2),正如其他帖子的答案中提到的那樣,當您嘗試執行我作爲SparkContext字段所做的操作時,會生成空指針異常在RDD被反序列化後沒有被設置。 –

+1

RDD可序列化請參閱[docs](https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.rdd.RDD)和[source](https: //github.com/apache/spark/blob/f85aa06464a10f5d1563302fd76465dded475a12/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L77)。 – zero323