2017-09-22 77 views
2

多個目錄中讀取多個拼花文件(同模式)需要使用DASK用相同的架構多拼花文件加載到一個單一的數據幀。當它們全都在同一個目錄中時,這種方式起作用,但當它們位於不同的目錄中時不起作用。我如何從DASK/fastparquet

例如:

import fastparquet 
pfile = fastparquet.ParquetFile(['data/data1.parq', 'data/data2.parq']) 

作品就好了,但如果我複製data2.parq到不同的目錄,下面不工作:

pfile = fastparquet.ParquetFile(['data/data1.parq', 'data2/data2.parq']) 

回溯我得到的是以下幾點:

--------------------------------------------------------------------------- 
ValueError        Traceback (most recent call last) 
<ipython-input-11-b3d381f14edc> in <module>() 
----> 1 pfile = fastparquet.ParquetFile(['data/data1.parq', 'data2/data2.parq']) 

~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep) 
    82   if isinstance(fn, (tuple, list)): 
    83    basepath, fmd = metadata_from_many(fn, verify_schema=verify, 
---> 84            open_with=open_with) 
    85    self.fn = sep.join([basepath, '_metadata']) # effective file 
    86    self.fmd = fmd 

~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with) 
    164  else: 
    165   raise ValueError("Merge requires all PaquetFile instances or none") 
--> 166  basepath, file_list = analyse_paths(file_list, sep) 
    167 
    168  if verify_schema: 

~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/util.py in analyse_paths(file_list, sep) 
    221  if len({tuple([p.split('=')[0] for p in parts[l:-1]]) 
    222    for parts in path_parts_list}) > 1: 
--> 223   raise ValueError('Partitioning directories do not agree') 
    224  for path_parts in path_parts_list: 
    225   for path_part in path_parts[l:-1]: 

ValueError: Partitioning directories do not agree 

使用dask.dataframe.read_parquet時出現同樣的錯誤,我想使用相同的ParquetFile對象。

如何從不同的目錄中加載多個文件?把我需要加載到同一個目錄下的所有文件都不是一個選項。

回答

3

這樣確實可以在fastparquet上的主人,如果使用的是絕對路徑或明確的相對路徑:

pfile = fastparquet.ParquetFile(['./data/data1.parq', './data2/data2.parq']) 

的需求爲主導./應被視爲一個錯誤 - 看到這個問題。

2

一種解決方法。將分別讀取每個塊,並傳遞給dask.dataframe.from_delayed。這不正是做處理該read_parquet做(以下'index'應該是指數)相同的元數據,但在其他方面應該工作。

import dask.dataframe as dd  
from dask import delayed  
from fastparquet import ParquetFile 

@delayed 
def load_chunk(pth): 
    return ParquetFile(pth).to_pandas() 

files = ['temp/part.0.parquet', 'temp2/part.1.parquet'] 
df = dd.from_delayed([load_chunk(f) for f in files]) 

df.compute() 
Out[38]: 
    index a 
0  0 1 
1  1 2 
0  2 3 
1  3 4 
+0

GitHub的問題 - https://github.com/dask/fastparquet/issues/217 – chrisb