2015-11-19 84 views
3

我試圖通過酸洗它來序列化Spark RDD,並將pickled文件直接讀入Python。酸洗Spark Spark RDD並將它讀入Python

a = sc.parallelize(['1','2','3','4','5']) 
a.saveAsPickleFile('test_pkl') 

然後我將test_pkl文件複製到我的本地。我如何直接將它們讀入Python?

pickle.load(open('part-00000','rb')) 

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/lib64/python2.6/pickle.py", line 1370, in load 
    return Unpickler(file).load() 
    File "/usr/lib64/python2.6/pickle.py", line 858, in load 
    dispatch[key](self) 
    File "/usr/lib64/python2.6/pickle.py", line 970, in load_string 
    raise ValueError, "insecure string pickle" 
ValueError: insecure string pickle 

我認爲火花采用酸洗法比蟒蛇鹹菜方法不同(正確的是:當我嘗試正常鹹菜包,當我試圖讀取「test_pkl」的第一個泡菜部分失敗我如果我錯了)。有什麼辦法讓我從Spark中醃製數據,並從文件中直接將這個pickle對象讀入python中?

+1

問題是,它不是一個鹹菜文件,而是一個[SequenceFile(https://wiki.apache.org/hadoop/SequenceFile)含有醃對象,我不知道有任何積極發展解析器用於Python中的SequenceFiles。 – zero323

回答

1

一個更好的方法可能是酸洗數據在每個分區中,對其進行編碼,並將其寫入到一個文本文件:

import cPickle 
import base64 

def partition_to_encoded_pickle_object(partition): 
    p = [i for i in partition] # convert the RDD partition to a list 
    p = cPickle.dumps(p, protocol=2) # pickle the list 
    return [base64.b64encode(p)] # base64 encode the list, and return it in an iterable 

my_rdd.mapPartitions(partition_to_encoded_pickle_object).saveAsTextFile("your/hdfs/path/") 

你的文件(S)下載到本地目錄後,就可以使用下面的代碼段來讀取它:

# you first need to download the file, this step is not shown 
# afterwards, you can use 
path = "your/local/path/to/downloaded/files/" 
data = [] 
for part in os.listdir(path): 
    if part[0] != "_": # this prevents system generated files from getting read - e.g. "_SUCCESS" 
     data += cPickle.loads(base64.b64decode((open(part,'rb').read()))) 
+0

這裏唯一的問題是加載部分需要將所有數據加載到'data'內存中,而這可能並不總是可能的。 – Tgsmith61591

+0

@ Tgsmith61591正確 - 如果您在單臺計算機上讀取數據,則通常無法讀取羣集中的所有數據。要解決這個問題,您可能需要從文件中過濾/縮小/提取所需的數據,例如'data + = some_filter_fx(cPickle.loads(base64.b64decode((open(part,'rb')。read()))))'' – mgoldwasser

1

問題是格式不是一個pickle文件。它是一個SequenceFile的酸漬objectssequence file可以在Hadoop和Spark環境中打開,但不打算在python中使用,並使用基於JVM的序列化進行序列化,在這種情況下是字符串列表。

1

可以使用sparkpickle項目。就這麼簡單

with open("/path/to/file", "rb") as f: 
    print(sparkpickle.load(f))