2017-12-27 359 views
0

爲什麼MapPartition中的ArrayBuffer似乎具有尚未遍歷的元素?Spark - 爲什麼ArrayBuffer似乎獲取尚未遍歷的元素

例如,我看這段代碼的方式,第一項應該有1個元素,第二個2,第三個3等等。第一個ArrayBuffer輸出可能有9個項目。這似乎意味着在第一次輸出之前有9次迭代,但收益計數清楚地表明這是第一次迭代。

val a = ArrayBuffer[Int]() 
for(i <- 1 to 9) a += i 
for(i <- 1 to 9) a += 9-i 
val rdd1 = sc.parallelize(a.toArray()) 

def timePivotWithLoss(iter: Iterator[Int]) : Iterator[Row] = { 
    val currentArray = ArrayBuffer[Int]() 
    var loss = 0 
    var yields = 0 
    for (item <- iter) yield { 
     currentArray += item 
     //var left : Int = -1 
     yields += 1 
     Row(yields, item.toString(), currentArray) 
    } 
} 

rdd1.mapPartitions(it => timePivotWithLoss(it)).collect() 

輸出 -

[1,1,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)] 
[2,2,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)] 
[3,3,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)] 
[4,4,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)] 
[5,5,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)] 
[6,6,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)] 
[7,7,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)] 
[8,8,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)] 
[9,9,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)] 
[1,8,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)] 
[2,7,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)] 
[3,6,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)] 
[4,5,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)] 
[5,4,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)] 
[6,3,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)] 
[7,2,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)] 
[8,1,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)] 
[9,0,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)] 

回答

1

這是因爲在分隔使用參照同可變對象的所有行。溢出到光盤可能進一步使它不確定,某些對象被序列化並且不能反映這些變化。

可以使用可變引用和不可變對象:

def timePivotWithLoss(iter: Iterator[Int]) : Iterator[Row] = { 
    var currentArray = Vector[Int]() 
    var loss = 0 
    var yields = 0 
    for (item <- iter) yield { 
    currentArray = currentArray :+ item 
    yields += 1 
    Row(yields, item.toString(), currentArray) 
    } 
} 

但總體可變狀態和火花都沒有很好的搭配。