2015-12-08 55 views
0

我想在集羣任務中訪問累加器的值。但是,當我這樣做,它拋出一個異常:如何訪問任務中累加器的值?

無法讀取累加器的值

我試圖用row.localValue但它返回相同的數字。有沒有解決方法?

private def modifyDataset(
    data: String, row: org.apache.spark.Accumulator[Int]): Array[Int] = { 

    var line = data.split(",") 
    var lineSize = line.size  
    var pairArray = new Array[Int](lineSize-1) 
    var a = row.value 
    paiArray(0)=a 

    row+=1 
    pairArray 

} 


var sc = Spark_Context.InitializeSpark 
var row = sc.accumulator(1, "Rows") 

var dataset = sc.textFile("path") 

var pairInfoFile = noHeaderRdd.flatMap{ data => modifyDataset(data,row) } 
    .persist(StorageLevel.MEMORY_AND_DISK)   
pairInfoFile.count() 

回答

0

這是根本不可能的,沒有解決方法。 Spark accumulators是工作人員視角下的只寫變量。在任務期間讀取其值的任何嘗試都沒有意義,因爲工作人員和本地累加器值之間沒有共享狀態,僅反映當前分區的狀態。

一般而言accumulators旨在主要用於診斷和不應該被用作應用程序邏輯的一部分。在轉換中使用時,唯一的保證是至少執行一次。

另見:How to print accumulator variable from within task (seem to "work" without calling value method)?

相關問題