2017-08-24 58 views
1

我正在使用dask.distributed來實現各種數據處理管道。通常從S3讀取原始數據,最後處理(大)集合也將在S3上寫入CSV。將dask集合異步存儲到文件/ CSV

我可以異步運行處理並監視進度,但我注意到存儲集合到文件的所有to_xxx()方法似乎都是同步調用。其中一個缺點是電話會阻塞很長時間。其次,我不能輕易構建一個完整的圖形,以便稍後執行。

有沒有辦法運行例如to_csv()異步獲取未來的對象而不是阻塞? PS:我非常確定我可以自己實現異步存儲,例如,通過將collection轉換爲delayed()並存儲每個分區。但它似乎是一個常見的情況 - 除非我錯過了已有的功能,否則在框架中包含類似這樣的內容會很好。

回答

1

大多數to_*函數都有一個compute=True關鍵字參數,可以用compute=False替代。在這些情況下,它將返回一系列延遲值,然後可以異步計算

values = df.to_csv('s3://...', compute=False) 
futures = client.compute(values)