2017-03-02 65 views




import numpy as np 
import pandas as pd 
import dask.dataframe as dd 

def func(data): 
    result = np.histogram(data.image.ravel(), bins=128)[0] 
    return result 

n = 10 
df = pd.DataFrame({'image': [(np.random.random((60, 24)) * 255).astype(np.uint8) for i in np.arange(n)], 
       'n1': np.arange(n), 
       'n2': np.arange(n) * 2, 
       'n3': np.arange(n) * 4 
print 'DataFrame\n', df 

hists = pd.Series([func(r[1]) for r in df.iterrows()]) 

# ddf = dd.from_pandas(df, npartitions=MAX_PROCESSORS) 
# hists = ddf.apply(func, axis=1, meta=pd.Series(name='data', dtype=np.ndarray)).compute() 

print 'Histograms \n', hists 


               image n1 n2 n3 
0 [[51, 254, 167, 61, 230, 135, 40, 194, 101, 24... 0 0 0 
1 [[178, 130, 204, 196, 80, 97, 61, 51, 195, 38,... 1 2 4 
2 [[122, 126, 47, 31, 208, 130, 85, 189, 57, 227... 2 4 8 
3 [[185, 141, 206, 233, 9, 157, 152, 128, 129, 1... 3 6 12 
4 [[131, 6, 95, 23, 31, 182, 42, 136, 46, 118, 2... 4 8 16 
5 [[111, 89, 173, 139, 42, 131, 7, 9, 160, 130, ... 5 10 20 
6 [[197, 223, 15, 40, 30, 210, 145, 182, 74, 203... 6 12 24 
7 [[161, 87, 44, 198, 195, 153, 16, 195, 100, 22... 7 14 28 
8 [[0, 158, 60, 217, 164, 109, 136, 237, 49, 25,... 8 16 32 
9 [[222, 64, 64, 37, 142, 124, 173, 234, 88, 40,... 9 18 36 
0 [81, 87, 80, 94, 99, 79, 86, 90, 90, 113, 96, ... 
1 [93, 76, 103, 83, 76, 101, 85, 83, 96, 92, 87,... 
2 [84, 93, 87, 113, 83, 83, 89, 89, 114, 92, 86,... 
3 [98, 101, 95, 111, 77, 92, 106, 72, 91, 100, 9... 
4 [95, 96, 87, 82, 89, 87, 99, 82, 70, 93, 76, 9... 
5 [77, 94, 95, 85, 82, 90, 77, 92, 87, 89, 94, 7... 
6 [73, 86, 81, 91, 91, 82, 96, 94, 112, 95, 74, ... 
7 [88, 89, 87, 88, 76, 95, 96, 98, 108, 96, 92, ... 
8 [83, 84, 76, 88, 96, 112, 89, 80, 93, 94, 98, ... 
9 [91, 78, 85, 98, 105, 75, 83, 66, 79, 86, 109,... 

可以看到註釋行,叫dask.DataFrame.apply。如果我已經註釋掉他們,我已經得到了異常dask.async.ValueError: Shape of passed values is (3, 128), indices imply (3, 4)


File "C:\Anaconda\envs\MBA\lib\site-packages\dask\base.py", line 94, in compute 
    (result,) = compute(self, traverse=False, **kwargs) 
    File "C:\Anaconda\envs\MBA\lib\site-packages\dask\base.py", line 201, in compute 
    results = get(dsk, keys, **kwargs) 
    File "C:\Anaconda\envs\MBA\lib\site-packages\dask\threaded.py", line 76, in get 
    File "C:\Anaconda\envs\MBA\lib\site-packages\dask\async.py", line 500, in get_async 
    raise(remote_exception(res, tb)) 
dask.async.ValueError: Shape of passed values is (3, 128), indices imply (3, 4) 




你有沒有考慮過使用[dask.delayed(http://dask.pydata.org/en/latest/delayed.html) ?這可能比使用數據框更直接。 – MRocklin


@MRocklin:還沒有。明天試試吧(今天是晚上8點30分) – wl2776


這個工作:'dh.r =(延遲(func,pure = True)(r [1])爲df.iterrows()]); hists = pd.Series(dhists.compute(get = dask.multiprocessing.get))' 但是,對數據幀的迭代仍然需要一段時間。 – wl2776




def func(data): 
    filtered = # filter data.image 
    result = np.histogram(filtered) 
    return result 

def func_partition(data, additional_args): 
    result = data.apply(func, args=(bsifilter,), axis=1) 
    return result 

if __name__ == '__main__': 
    n = 30000 
    df = pd.DataFrame({'image': [(np.random.random((180, 64)) * 255).astype(np.uint8) for i in np.arange(n)], 
        'n1': np.arange(n), 
        'n2': np.arange(n) * 2, 
        'n3': np.arange(n) * 4 

    ddf = dd.from_pandas(df, npartitions=MAX_PROCESSORS) 
    dhists = ddf.map_partitions(func_partition, bfilter, meta=pd.Series(dtype=np.ndarray)) 
    print 'Delayed dhists = \n', dhists 
    hists = pd.Series(dhists.compute())