2017-06-07 51 views
1

我有一個示例數據集存在於我的本地,我試圖做一些羣集上的基本操作。錯誤 - 來自工作者的錯誤沒有這樣的文件或目錄:'文件路徑'

import dask.dataframe as ddf 
    from dask.distributed import Client 
    client = Client('Ip address of the scheduler') 
    import dask.dataframe as ddf 
    csvdata = ddf.read_csv('Path to the CSV file') 

客戶端被連接到又連接到兩個工人(在其他機器上)的調度器。

我的問題可能很微不足道。

  1. 這個csv文件應該存在於其他worker節點上嗎?

    我似乎得到文件未找到錯誤。

  2. 使用,

    futures=client.scatter(csvdata) 
    x = ddf.from_delayed([future], meta=df) 
    #Price is a column in the data 
    df.Price.sum().compute(get=client.get) #returns" dd.Scalar<series-..., dtype=float64>" How do I access it? 
    client.submit(sum, x.Price) #returns "distributed.utils - ERROR - 6dc5a9f58c30954f77913aa43c792cc8" 
    

而且,我也請參閱本 Loading local file from client onto dask distributed clusterhttp://distributed.readthedocs.io/en/latest/manage-computation.html

我想我是混合了很多東西在這裏,我的理解是糊塗了。 任何幫助將非常感激。

回答

1

是的,這裏dask.dataframe假定您的客戶代碼中引用的文件也可以由您的工作人員訪問。如果情況並非如此,那麼您需要在本地機器上明確讀入數據,並將其分散給工人。

它看起來像你試圖做到這一點,除了你散射dask數據框而不是熊貓數據框。在分散它之前,您實際上必須具體加載來自磁盤的熊貓數據。如果您的數據裝入內存,那麼你應該能夠做到你現在在做什麼,但更換dd.read_csv呼叫與pd.read_csv

csvdata = pandas.read_csv('Path to the CSV file') 
[future] = client.scatter([csvdata]) 
x = ddf.from_delayed([future], meta=df).repartition(npartitions=10).persist() 
#Price is a column in the data 
df.Price.sum().compute(get=client.get) # Should return an integer 

如果你的數據太大,那麼你可以考慮使用本地dask可以逐個讀取並散佈數據到您的集羣。

import dask.dataframe as dd 
ddf = dd.read_csv('filename') 
futures = ddf.map_partitions(lambda part: c.scatter([part])[0]).compute(get=dask.get) # single threaded local scheduler 

ddf = dd.from_delayed(list(futures), meta=ddf.meta) 
相關問題