2017-08-25 63 views
0

假設我有一個熊貓數據幀稱爲df應用用戶定義的函數的PySpark數據幀,並返回一個字典

id value1 value2 
1 2 1 
2 2 1 
3 4 5 

在普通的Python,我寫了一個函數來處理這個數據幀,並返回一個字典:

d = dict() 
for row in df.itertuples() 
    x = do_something (row) 
    d[x[0]] = x[1:] 

我試圖用Spark重新實現這個函數。

d = dict() # define a global var 
def do_something (id, value1, value2): 
    # business logic 
    d[x0] = [x1,x2,x3] 
    return 0 
udf_do = udf (do_something) 

則:

df_spark.select (udf_do ('id','value1','value2'))

我的想法是,通過調用df_spark.select,功能do_something將被調用了數據幀,並且將更新全局變量d。我並不在意udf_do的返回值,所以我返回0.

我的解決方案確實無法正常工作。

你能否建議我通過一些方法來迭代(我知道它不是一個Spark方式)或以某種方式來處理Spark數據框和更新外部字典?

請注意,數據幀非常大。我試圖通過調用toPandas()將其轉換爲熊貓,但我有OOM問題。

+0

似乎您正在尋找類似的東西[answer](https://stackoverflow.com/a/45881682/8240561) – Prem

回答

0

UDF無法更新任何全局狀態。但是,您可以在UDF內部進行一些業務登錄,然後使用toLocalIterator以內存有效的方式將所有數據傳送到驅動程序(按分區分區)。例如:

df = spark.createDataFrame([(10, 'b'), (20, 'b'), (30, 'c'), 
          (40, 'c'), (50, 'c'), (60, 'a')], ['col1', 'col2']) 
df.withColumn('udf_result', ......) 
df.cache() 
df.count() # force cache fill 

for row in df.toLocalIterator(): 
    print(row) 
相關問題