2017-07-26 81 views
2

我希望提交一個DASK的任務,將做到以下幾點:DASK計算與子期貨

  1. 使用dask.bag(def fakejob
  2. 計算圖表建立一個懶DASK圖表從1和保存它要實木複合地板(留下這部分,只是一個動機)

我需要爲多個輸入做到這一點,所以我一直在嘗試使用dask.distributed的期貨功能。

from dask.distributed import Client 

client = Client(processes=True) 

def fakejob(path): 
    return (
     dask.bag 
     .read_text(path) 
     .to_dataframe() 
    ) 

futures = client.map(fakejob, [input_path1, input_path2]) 

問題是我不斷收到:AssertionError: daemonic processes are not allowed to have children

我試過以下this link並結束了第二個版本(均不同於第一1號線),但期貨留「待定」永遠。

from dask.distributed import Client 

client = Client(processes=True) 

def fakejob(path): 
    with dask.set_options(get=client.get): 
     return (
      dask.bag 
      .read_text(path) 
      .to_dataframe() 
     ) 

futures = client.map(fakejob, [input_path1, input_path2]) 

任何關於如何做到這一點的線索?

乾杯。

回答

2

奇怪的和稍微巨大的錯誤信息來自於嘗試構建工作進程內的dask圖(這是一個包),如果用client.map調用,事情最終會結束。如果您可以將整個工作流程放在該功能中,包括寫入實木複合地板,並且不嘗試將包裹傳遞給呼叫者,則您的第二次嘗試將與本地客戶端一起工作。

解決方案更簡單。

bags = [dask.bag.read_text(path) 
     .to_dataframe() for path in [input_path1, input_path2]) 
futures = client.compute(bags) # run in background on the cluster 
client.gather(futures) # wait and get results 

這裏,bags是DASK袋,即工作任務定義的列表,但還沒有運行。您可以用dask.compute(*bags)取代最後兩行,以獲得結果而不用擔心期貨。

+0

嗨!感謝您的回覆:) 您發佈的解決方案確實更簡單,但我認爲它不能解決我的問題。 'bags = ...'中的列表理解將按順序構建每個圖形,對嗎?我實際上希望圖的構建也是平行的。換句話說,我希望列表理解能夠並行發生。 有道理,還是我弄錯了什麼? –

+0

正確,列表的建立是在本地線程中的串行 - 但它對你來說很慢嗎?我認爲唯一的選擇是使用'dask.delayed'和'db.from_delayed',但是這是包在內部在字節塊上做的事情;花時間唯一需要的是找到每個文件的大小。 – mdurant

+0

我明白了。實際上需要一段時間。我正在數據科學團隊辦公室與dask一起進行實驗。我使用'dask.bag'來解析大約100GB /天的json數據,並將其轉換爲parquet進行非常具體的分析項目。它一直運行良好,但大約需要10分鐘才能構建圖的惰性對象。我們計劃進一步發展,這種開銷可能是未來的一個問題。現在你的建議是完美的。謝謝! –