2017-05-07 98 views
2

我有一個HDF5文件,我想加載到Dask DataFrame的列表中。我已經在Dask pipeline approach的縮寫版本後面使用了一個循環來設置它。下面是代碼:將HDF文件加載到Python Dask DataFrame的列表中

import pandas as pd 
from dask import compute, delayed 
import dask.dataframe as dd 
import os, h5py 

@delayed 
def load(d,k): 
    ddf = dd.read_hdf(os.path.join(d,'Cleaned.h5'), key=k) 
    return ddf 

if __name__ == '__main__':  
    d = 'C:\Users\User\FileD' 
    loaded = [load(d,'/DF'+str(i)) for i in range(1,10)] 

    ddf_list = compute(*loaded) 
    print(ddf_list[0].head(),ddf_list[0].compute().shape) 

我收到此錯誤信息:

C:\Python27\lib\site-packages\tables\group.py:1187: UserWarning: problems loading leaf ``/DF1/table``:: 

    HDF5 error back trace 

    File "..\..\hdf5-1.8.18\src\H5Dio.c", line 173, in H5Dread 
    can't read data 
    File "..\..\hdf5-1.8.18\src\H5Dio.c", line 543, in H5D__read 
    can't initialize I/O info 
    File "..\..\hdf5-1.8.18\src\H5Dchunk.c", line 841, in H5D__chunk_io_init 
    unable to create file chunk selections 
    File "..\..\hdf5-1.8.18\src\H5Dchunk.c", line 1330, in H5D__create_chunk_file_map_hyper 
    can't insert chunk into skip list 
    File "..\..\hdf5-1.8.18\src\H5SL.c", line 1066, in H5SL_insert 
    can't create new skip list node 
    File "..\..\hdf5-1.8.18\src\H5SL.c", line 735, in H5SL_insert_common 
    can't insert duplicate key 

End of HDF5 error back trace 

Problems reading the array data. 

The leaf will become an ``UnImplemented`` node. 
    % (self._g_join(childname), exc)) 

消息提到了一個重複鍵。我迭代了前9個文件來測試代碼,並且在循環中,我使用每次迭代來組裝與dd.read_hdf一起使用的不同密鑰。在所有迭代中,我保持文件名是相同的 - 只有密鑰正在改變。

我需要使用dd.concat(list,axis=0,...)才能垂直連接文件的內容。我的方法是首先將它們加載到列表中,然後將它們連接起來。

我已經安裝了PyTablesh5Py並且有Dask版本0.14.3+2

隨着熊貓0.20.1,我似乎得到這個工作:

for i in range(1,10): 
    hdf = pd.HDFStore(os.path.join(d,'Cleaned.h5'),mode='r') 
    df = hdf.get('/DF{}' .format(i)) 
    print df.shape 
    hdf.close() 

有沒有一種方法可以讓我這個HDF5文件加載到DASK DataFrames的名單?還是有另一種方法將它們垂直連接在一起?

回答

3

Dask.dataframe已經很懶,所以不需要使用dask.delayed來使它更加懶惰。您可以反覆撥打dd.read_hdf

ddfs = [dd.read_hdf(os.path.join(d,'Cleaned.h5'), key=k) 
     for k in keys] 

ddf = dd.concat(ddfs) 
+0

我錯過了。謝謝! –

+0

是否可以在同一管道中使用混合'延遲'和'非延遲'功能? –

+1

請參閱[這些文檔](http://dask.pydata.org/en/latest/delayed-collections.html)瞭解如何在延遲值和dask.dataframes之間進行轉換。沒有理由在惰性函數中嵌套懶惰函數。 – MRocklin

相關問題