我使用LongAccumulator
作爲地圖操作中的共享計數器。但似乎我沒有正確使用它,因爲工作節點上的計數器狀態未更新。下面是我的櫃檯類的樣子:工作節點爲什麼沒有看到另一個工作節點上的累加器更新?
public class Counter implements Serializable {
private LongAccumulator counter;
public Long increment() {
log.info("Incrementing counter with id: " + counter.id() + " on thread: " + Thread.currentThread().getName());
counter.add(1);
Long value = counter.value();
log.info("Counter's value with id: " + counter.id() + " is: " + value + " on thread: " + Thread.currentThread().getName());
return value;
}
public Counter(JavaSparkContext javaSparkContext) {
counter = javaSparkContext.sc().longAccumulator();
}
}
至於當應用程序在多個工作節點中運行的我能理解的文檔這應該正常工作:
蓄電池是變量,這些變量只有「添加「到通過關聯和交換操作,因此可以有效地支持並行。它們可以用來實現計數器(如在MapReduce中)或者和。 Spark本身支持數字類型的累加器,程序員可以添加對新類型的支持。
但這裏是在計數器上的2名不同的工人增加的結果,因爲它看起來像的狀態不是節點之間共享:
信息計數器:id爲遞增計數器:866上線程:執行程序任務啓動worker-6 INFO計數器:計數器的值爲:866爲:線程上的1:執行程序任務啓動worker-6
INFO計數器:在線程上遞增計數器:id:866執行程序任務啓動worker-0 INFO計數器:ID爲866的計數器值爲:1在線程上:執行程序任務啓動worker-0
我理解累加器構想錯誤還是有任何設置我必須開始執行任務?