2017-09-27 42 views
0

我有幾個.mat文件(matlab),我想用PySpark處理。但我不知道如何平行進行。這是我希望並行化的基本單線程設置。該代碼將生成列表,其中,每個內部列表具有任意長度的列表:用Pyspark分析多個非文本文件

filenames = ['1.mat','2.mat',...] 
output_lists = [None]*len(filenames) # will be a list of lists 

for i,filename in enumerate(filenames): 
    output_lists[i] = analyze(filename) # analyze is some function that returns a list 

任何個人output_lists [I]可以適合在存儲器中,而是整個output_lists對象不能。我希望output_lists是一個rdd。

任何想法?我也很樂意使用pyspark和多處理模塊的組合。謝謝!

回答

0

其他答案看起來很優雅,但我不想安裝新的文件系統。我選擇與joblib模塊並行分析文件,將結果寫入.txt文件,並使用Spark打開.txt文件。

from joblib import Parallel, delayed 

def analyze(filename): 
    # write results to text file with name= filename+'.txt' 
    return 

filenames = ['1.mat', '2.mat', ...] 
Parallel(n_jobs=8)(delayed(analyze)(filename) for filename in filenames) 

然後我用Pyspark所有.txt文件讀取到一個RDD:

data = sc.textFile('path/*.txt') 
0
  • 在POSIX兼容的文件系統提檔,可從每名工人被訪問(NFS,MAPR文件系統,文件系統Databricks,頭孢)
  • 轉換路徑,使他們反映在文件系統路徑。
  • parallelize名稱:

    rdd = sc.parallelize(filenames) 
    
  • mapprocess

    result = rdd.map(analyze) 
    
  • 做任何你想要與結果的事情。