2016-01-26 35 views
2

Spark 1.5.1 + Java 1.8如何攔截驅動程序上累加器的部分更新?

我們正在使用spark將固體記錄上傳到數據庫。

動作代碼看起來是這樣的:

rdd.foreachPartition(new VoidFunction<Iterator<T>>() { 

    @Override 
    public void call(Iterator<T> iter) { 
      //while there are more records perform the following every 1000 records 
      //int[] recoords = statement.executeBatch(); 
      //accumulator.add(recoords.length); 
    } 
    // ... 
} 

在驅動器節點有一個監視累加器值的線程。但是,該值不會更新。它只會在應用程序結束時更新一次。即使累加器使用延遲值設置,它應該正確更新,因爲我正在驅動程序節點線程中定期讀取值。

我是否錯誤地使用了累加器?無論如何,我可以更持續地監控我的工人的進展情況嗎?

回答

2

您可以監視累加器的值,但不能連續完成,即更新在任務完成後發生。

雖然累加器被稱爲共享變量,但並不真正共享。每個任務都有自己的累加器,在任務完成後合併。這意味着任務運行時無法更新全局值。

爲了能夠看到更新,執行程序的數量必須少於已處理分區的數量(對應於任務的數量)。其原因是當累加器更新發送給驅動程序時引入「障礙」。

例如:

import org.apache.spark.{SparkConf, SparkContext} 

object App { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setMaster("local[4]") 
    val sc = new SparkContext(conf) 

    val accum = sc.accumulator(0, "An Accumulator") 
    val rdd = sc.parallelize(1 to 1000, 20) 

    import scala.concurrent.duration._ 
    import scala.language.postfixOps 
    import rx.lang.scala._ 

    val o = Observable.interval(1000 millis).take(1000) 
    val s = o.subscribe(_ => println(accum.value)) 
    rdd.foreach(x => { 
     Thread.sleep(x + 200) 
     accum += 1 
    }) 
    s.unsubscribe 
    sc.stop 
    } 
} 

正如你可以看到全局值每項任務只更新一次。

如果您按照提供的示例創建了命名累加器,那麼您還可以使用Spark UI來監視它的狀態。只需打開階段選項卡,導航到特定階段並檢查累加器部分。

有沒有反正我可以更持續地監控我的工人的進度?

最可靠的方法是通過添加更多的分區來增加粒度,但它並不便宜。

+0

_「雖然累加器被稱爲共享變量,但並不真正共享。」_ - 它們在工作人員和驅動程序之間共享。 –

+0

@JacekLaskowski我認爲這取決於你如何定義共享。恕我直言,呼籲國家不能被觀察到,並且永遠不會傳播給工人是一種虐待。順便說一句,感謝您的編輯。 – zero323