2016-10-11 50 views
0

當我迭代集合並在迭代遍佈數組時將項添加到數組中似乎爲空時,spark 1.6.0(我對spark和scala不太熟悉)。迭代後Spark數組是空的

var testing = unlabeled.map { line => 
    val parts = line.split(',') 
    val text = parts(7).split(' ') 
    (line, htf.transform(text)) 
} 

var lowPropQueue = new mutable.ArrayBuffer[(String, org.apache.spark.mllib.linalg.Vector)] 
var highPropQueue = new mutable.ArrayBuffer[(String, org.apache.spark.mllib.linalg.Vector)] 

for(counter <- 1 to 5){ 

    logger.info("this is the " + counter + " run -----------------") 
    for (i <- testing) { 
    val label = model.predict(i._2).toString 
    //  logger.info(i._1.split(",")(7)) 
    //  logger.info(label) 
    var probs = model.predictProbabilities(i._2) 
    logger.info("prob 0 : " + probs(0)) 
    logger.info("prob 1 : " + probs(1)) 
    logger.info("--------------------- ") 

    if (probs(0).toDouble <= 0.95 && probs(1).toDouble <= 0.95) { 
     lowPropQueue.+=(i) 
    } else { 
     highPropQueue.+=((i._1 + "," + label , i._2)) 
    } 

    logger.info("size of high array : " + highPropQueue.length) 
    logger.info("size of low array : " + lowPropQueue.length) 

    } 

    logger.info("passed: " + lowPropQueue.length) 
    logger.info("NOT passed: " + highPropQueue.length) 

    var xx= sc.parallelize(highPropQueue).collect() 
    var yy = sc.parallelize(lowPropQueue).collect() 

    logger.info("passed: " + xx.length) 
    logger.info("NOT passed: " + yy.length) 
... 
} 

但是基於內環似乎元素添加到陣列中的日誌,即:

16/10/11 11時22分31秒INFO SelfLearningMNB $:高數組大小: 500

16/10/11 11時22分31秒INFO SelfLearningMNB $:83

16/10/11 11時22分31秒INFO SelfLearningMNB $:低陣列的大小概率0:0.37094327822665185

16/10/11 11:22:31信息SelfLearningMNB $:概率1:0.6290567217733481

16/10/11 11:22:31信息SelfLearningMNB $:------------ ---------

16/10/11 11時22分31秒INFO SelfLearningMNB $:500

16/10/11 11時22分31秒INFO SelfLearningMNB $:高陣列的大小:84

16/10/11 11時22分31秒INFO SelfLearningMNB $:低陣列的大小概率0:0.16872929936216619

16/10/11 11時22分31秒INFO SelfLearningMNB $:概率1:0.8312707006378338

但是,當內循環結束我得到這個:

16/10/11 11時43分五十三秒INFO SelfLearningMNB $:通過:0

16/10/11 11:43:53信息SelfLearningMNB $:未通過:0

這是怎麼回事?

編輯

你怎麼會從執行者獲取數據或保存從執行者到HDFS的數據,使他們能夠從後面主節點讀取?

回答

1

TL; DR這不能在Spark中工作。

這是怎麼回事?

  • 每個執行都有自己的lowPropQueuehighPropQueue副本。
  • 迭代期間本地副本是否被修改
  • 後迭代的本地副本將被丟棄

FYI天真追加到ArrayBuffer不是線程安全的。

+0

我雖然這一點。但是如何將來自執行者的數據存儲到「全局」數組呢? – bill

+0

您可以嘗試累加器,但您需要同步訪問權限並查看您的代碼,但它不會擴展。 – 2016-10-11 10:21:07

+0

我做了一些搜索,這種方法絕對不適合Spark。我不得不映射一切,但它的工作。 – bill