2015-05-25 86 views
0

我有一個數組,它在封閉內(它有一些值),但在循環外,數組大小爲0.我想知道是什麼原因導致行爲如此?數組火花關閉

我需要hArr可以在批處理HBase之外訪問。

val hArr = new ArrayBuffer[Put]() 

rdd.foreach(row => { 
    val hConf = HBaseConfiguration.create() 
    val hTable = new HTable(hConf, tablename) 
    val hRow = new Put(Bytes.toBytes(row._1.toString)) 
    hRow.add(...) 
    hArr += hRow 
    println("hArr: " + hArr.toArray.mkString(",")) 
}) 

println("hArr.size: " + hArr.size) 
+0

我今天見過類似的東西http://stackoverflow.com/q/30437856/210905 – Odomontois

回答

0

問題是,rdd閉包中的任何項目都被複制並使用本地版本。 foreach只能用於保存到磁盤或沿着這些線路的東西。

如果你想要這個數組中,那麼你可以map然後collect

rdd.map(row=> { 
    val hConf = HBaseConfiguration.create() 
    val hTable = new HTable(hConf, tablename) 
    val hRow = new Put(Bytes.toBytes(row._1.toString)) 
    hRow.add(...) 
    hRow 
}).collect() 
0

我發現了相當長的一段新的Spark用戶感到困惑,他們是如何映射器和減速功能得到運行,如何與在驅動程序中定義的東西。一般來說,您所定義並通過map或foreach或reduceByKey或許多其他變體進行了註冊的mapper/reducer功能都不會在您的驅動程序中執行。在您的驅動程序中,您只需將它們註冊爲Spark即可遠程分佈式運行它們。當這些函數引用您在驅動程序中實例化的某些對象時,您實際上創建了一個「Closure」,它會在大多數時間編譯OK。但通常情況下,這不是你想要的,你會在運行時遇到問題,通過查看NotSerializable或ClassNotFound異常。

您可以通過foreach()變體遠程執行所有輸出工作,也可以嘗試通過調用collect()將所有數據回收到驅動程序中輸出。但是請注意collect(),因爲它會將來自分佈式節點的所有數據收集到您的驅動程序中。只有當你確定你的最終彙總數據很小時,你纔會這樣做。