2015-08-17 31 views
1

我有這段代碼,我正在使用pysparkipython中讀取一個文件。我試圖做的是添加一個片段,它基於從文件中讀取的特定列形成列表,但是當我嘗試執行它時,列表變爲空,並且沒有任何內容被附加到它。我的代碼是:使用PySpark從地圖創建全局列表的問題

list1 = [] 

def file_read(line): 

    list1.append(line[10]) 
    # bunch of other code which process other column indexes on `line` 

inputData = sc.textFile(fileName).zipWithIndex().filter(lambda (line,rownum): rownum>0).map(lambda (line, rownum): line) 

column_val = (inputData 
    .map(lambda line: line.split(",")) 
    .filter(lambda line: len(line) >1) 
    .map(file_read)) 

當我執行的代碼,這部分list1還是對空,即使有,因爲我在上面的相同功能的代碼的其他部分使用它在line[10]的數據。看起來好像只是沒有將它追加到列表中。我如何形成上面的列表?

+0

你的例子很不完整,只能推測。 list1在其他地方被清除了嗎?您是否嘗試過在append之前/之後添加打印語句,打印list1以及正在追加的內容? – barny

+0

@barny list1未被其他地方清除。我試圖在'file_read()'函數內執行'print line [19]',但是當我調用它時什麼都不會打印 –

回答

3

嗯,它實際上附加到list1,問題不在於你正在考慮的問題。序列化封閉中引用的每個變量併發送給工作人員。它也適用於list1

每個分區都會收到它自己的list1副本,當file_read被調用時,數據將被追加到該副本中,並且當給定的映射階段結束時,它將超出範圍並被丟棄。

不是特別優雅的代碼,但你應該看到,這是真的這裏發生了什麼:

rdd = sc.parallelize(range(100), 5) 

line1 = [] 

def file_read(line): 
    list1.append(line) 
    print len(list1) 
    return line 

xs = rdd.map(file_read).collect() 

編輯

星火提供了兩種類型的共享變量。 Broadcast variables(僅從工作人員透視圖中讀取)和accumulators(僅從驅動程序透視圖中寫入)。

默認情況下,累加器僅支持數字變量,並且主要用作計數器。雖然可以定義自定義累加器。要做到這一點,你必須擴展AccumulatorParam類,並提供定製zeroaddInPlace實現:

class ListParam(AccumulatorParam): 
    def zero(self, v): 
     return [] 
    def addInPlace(self, acc1, acc2): 
     acc1.extend(acc2) 
     return acc1 

接下來,您可以重新定義file_read如下:

def file_read1(line): 
    global list1 # Required otherwise the next line will fail 
    list1 += [line] 
    return line 

用法示例:

list1 = sc.accumulator([], ListParam()) 

rdd = sc.parallelize(range(10)).map(file_read1).collect() 
list1.value 

即使如果可以像這樣使用蓄能器,那麼在實踐中使用它可能會很昂貴在最壞的情況下,它可能會導致駕駛員死亡。相反,你可以簡單地使用另一個轉變:

tmp = (inputData 
    .map(lambda line: line.split(",")) 
    .filter(lambda line: len(line) >1)) 

def line_read2(line): return ... # Just a core logic 

line1 = tmp.map(lambda line: line[10]) 
column_val = tmp.map(line_read2) 

旁註:您提供什麼都不做

代碼。 Spark中的轉換隻是描述了必須完成的工作,但在調用操作數據之前,沒有任何操作是真正執行的。