2017-05-24 114 views
1

我的目標是處理一系列通過調用org.apache.spark.rdd.RDD[_].saveAsObjectFile(...)產生SequenceFile文件夾。我的文件夾結構與此類似:範圍界定問題的foreach斯卡拉

\MyRootDirectory 
    \Batch0001 
    _SUCCESS 
    part-00000 
    part-00001 
    ... 
    part-nnnnn 
    \Batch0002 
    _SUCCESS 
    part-00000 
    part-00001 
    ... 
    part-nnnnn 
    ... 
    \Batchnnnn 
    _SUCCESS 
    part-00000 
    part-00001 
    ... 
    part-nnnnn 

我需要提取一些持久的數據,但是我的收藏 - 我是否使用ListBuffermutable.Map,或任何其他可變類型,失去範圍,似乎是newed上來就sequenceFile(...).foreach

每次迭代概念的以下證明產生了一系列的「處理目錄......」接着是「1:1」的反覆,從不增加,如我所料counterintList.size做。

private def proofOfConcept(rootDirectoryName: String) = { 
    val intList = ListBuffer[Int]() 
    var counter: Int = 0 
    val config = new SparkConf().setAppName("local").setMaster("local[1]") 
    new File(rootDirectoryName).listFiles().map(_.toString).foreach { folderName => 
     println(s"Processing directory $folderName...") 
     val sc = new SparkContext(config) 
     sc.setLogLevel("WARN") 
     sc.sequenceFile(folderName, classOf[NullWritable], classOf[BytesWritable]).foreach(f => { 
     counter += 1 
     intList += counter 
     println(s" $counter : ${intList.size}") 
     }) 
     sc.stop() 
    } 
    } 

輸出:

"C:\Program Files\Java\jdk1.8.0_111\bin\java" ... 
Processing directory C:\MyRootDirectory\Batch0001... 
17/05/24 09:30:25.228 WARN [main] org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
[Stage 0:>               (0 + 0)/57] 1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
Processing directory C:\MyRootDirectory\Batch0002... 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
Processing directory C:\MyRootDirectory\Batch0003... 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
+0

https://spark.apache.org/docs/latest/programming-guide.html#understanding-closures – zero323

+0

當Spark不在圖片中時,您是否看到這個? –

回答

1

內部foreach功能是在火花工人JVM運行,而不是在客戶端JVM,其中變量定義內。該工作人員在本地獲取該變量的副本,將其增加並打印出來。我的猜測是你在本地測試這個嗎?如果你是在生產,分配火花環境中運行這個,你甚至看不到那些打印輸出。

更普遍,幾乎所有的功能,您傳遞到可能會被遠程實際執行的RDD的方法之一,將不會有任何局部變量或任何可變的訪問。它會得到一個基本不變的快照。

如果你想將數據從火花的分佈式存儲移回客戶端,使用RDD的collect方法。反向與sc.parallelize完成。但是請注意,這兩種通常做得非常罕見,因爲它們不併行發生。