2015-11-06 38 views
1

我遍歷文件來收集有關字典中的列和行中的值的信息。我有以下代碼在本地工作:如何解決pyspark中的pickle錯誤?

def search_nulls(file_name): 
    separator = ',' 
    nulls_dict = {} 
    fp = open(file_name,'r') 
    null_cols = {} 
    lines = fp.readlines() 

    for n,line in enumerate(lines): 
     line = line.split(separator) 
     for m,data in enumerate(line): 
      data = data.strip('\n').strip('\r') 
      if str(m) not in null_cols: 
       null_cols[str(m)] = defaultdict(lambda: 0) 
      if len(data) <= 4: 
       null_cols[str(m)][str(data)] = null_cols[str(m)][str(data)] + 1 

    return null_cols 


files_to_process = ['tempfile.csv'] 
results = map(lambda file: search_nulls(file), files_to_process) 

上面的代碼工作正常,沒有火花。 我評論的最後兩行以上,我嘗試用火花,因爲這是一些原型,將需要運行分佈:

os.environ['SPARK_HOME'] = <path_to_spark_folder> 
conf = SparkConf().setAppName("search_files").setMaster('local') 

sc = SparkContext(conf=conf) 

objects = sc.parallelize(files_to_process) 
resulting_object = \ 
    objects.map(lambda file_object: find_nulls(file_object)) 

result = resulting_object.collect() 

使用的火花,不過,這將導致以下錯誤:

File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 267, in dump_stream 
    bytes = self.serializer.dumps(vs) 
    File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps 
    return pickle.dumps(obj, protocol) 
TypeError: expected string or Unicode object, NoneType found​ 

我一直無法找到任何明顯的原因,爲什麼這會失敗,因爲它在本地完美運行,並且我沒有在工作節點間共享任何文件。實際上,我只是在本地機器上運行它。

有誰知道爲什麼這可能會失敗的一個很好的理由?

回答

5

你的問題的根源是以下行:

null_cols[str(m)] = defaultdict(lambda: 0) 

正如你可以在the pickle module documentationWhat can be pickled and unpickled?部分閱讀:

The following types can be pickled:

  • ...
  • functions defined at the top level of a module (using def, not lambda)
  • built-in functions defined at the top level of a module
  • ...

應該明確的是,lambda: 0不符合以上標準。爲了使它工作,你可以例如用int替換lambda表達式:

null_cols[str(m)] = defaultdict(int) 

這怎麼可能,我們可以通過lambda表達式中PySpark高階功能呢?魔鬼在細節中。 PySpark根據上下文使用不同的序列化器。要序列化閉包,包括lambda表達式,它使用定製的cloudpickle,它支持lambda表達式和嵌套函數。爲了處理數據,它使用默認的Python工具。


一些旁註:

  • 我不會使用Python file對象讀取數據。它不可移植,不會超出本地文件系統。您可以改用SparkContex.wholeTextFiles
  • 如果確實關閉連接。使用with聲明通常是最好的方法
  • 你可以放心地剝離換行符拆分前行
+0

所以,我只想澄清,一般來說,可以在本地序列化'lambda'功能應該能夠被「pyspark」序列化?瞭解這一點對於本地測試的目的是有用的。感謝你對這個問題的堅持。 – Sother

+0

大部分時間是的。你必須考慮事情發生的時間和地點,一般來說我不會濫用lambda。幾乎所有的常見操作都可以使用內置函數來執行,而沒有靜態類型,就會出現錯誤,本質上不可測試,並且令人驚訝的是冗長。 – zero323

相關問題