2016-11-29 112 views
2

爲什麼我下面用pyspark寫的計數器並不總是給我提供正確的結果,它與全球計數器有關嗎?pyspark全球櫃檯

def increment_counter(): 
    global counter 
    counter += 1 

def get_number_of_element(rdd): 
    global counter 
    counter = 0 
    rdd.foreach(lambda x:increment_counter()) 
    return counter 

回答

4

您的全局變量只在驅動程序節點上定義,這意味着它將正常工作,直到您在本地主機上運行。 只要您將作業分配給多個進程,他們將無法訪問變量counter,只需在其自己的進程中創建一個新的變量。所以最終的結果將只包含在驅動程序進程中完成的增量。

儘管如此,您正在尋找的是一個相當常見的用法,並且由Spark的累加器功能覆蓋。在過程結束時分配和收集累加器,因此總計將包含所有節點的增量,而不是僅包含驅動程序節點。

Accumulators - Spark Programming Guide

+0

太好了!非常感謝! – xxx222