2017-09-03 75 views
1

我正在用Dask.delayed取得良好進展。作爲一個團隊,我們決定花更多時間使用Dask來處理圖。將圖發佈到跨集羣節點

我有一個關於分配的問題。我在集羣中看到以下行爲。我開始每個8個節點上有8個工作人員,每個工作人員有4個線程,然後說我然後client.compute 8個圖形創建模擬數據以供後續處理。我想讓8個數據集每個節點生成一個。然而,似乎發生的是,並非不合理的是,這八個函數在前兩個節點上運行。隨後的計算在第一個和第二個節點上運行。因此我認爲缺乏縮放。隨着時間的推移,其他節點將從診斷工作頁面中消失。這是預期的嗎?

所以我想先按節點分配數據創建函數。所以,當我想計算圖表,現在我做:

if nodes is not None: 
    print("Computing graph_list on the following nodes: %s" % nodes) 
    return client.compute(graph_list, sync=True, workers=nodes, **kwargs) 
else: 
    return client.compute(graph_list, sync=True, **kwargs) 

這似乎是正確設置:診斷進度條顯示,我的數據創建功能在內存中,但它們不啓動。如果省略節點,則計算按預期進行。在羣集和我的桌面上都會發生此行爲。

更多信息:看看調度程序日誌,我確實看到通信失敗。

more dask-ssh_2017-09-04_09\:52\:09/dask_scheduler_sand-6-70\:8786.log 
distributed.scheduler - INFO - ----------------------------------------------- 
distributed.scheduler - INFO - Scheduler at: tcp://10.143.6.70:8786 
distributed.scheduler - INFO -  bokeh at:    0.0.0.0:8787 
distributed.scheduler - INFO -  http at:    0.0.0.0:9786 
distributed.scheduler - INFO - Local Directory: /tmp/scheduler-ny4ev7qh 
distributed.scheduler - INFO - ----------------------------------------------- 
distributed.scheduler - INFO - Register tcp://10.143.6.73:36810 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.73:36810 
distributed.scheduler - INFO - Register tcp://10.143.6.71:46656 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.71:46656 
distributed.scheduler - INFO - Register tcp://10.143.7.66:42162 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.66:42162 
distributed.scheduler - INFO - Register tcp://10.143.7.65:35114 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.65:35114 
distributed.scheduler - INFO - Register tcp://10.143.6.70:43208 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.70:43208 
distributed.scheduler - INFO - Register tcp://10.143.7.67:45228 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.67:45228 
distributed.scheduler - INFO - Register tcp://10.143.6.72:36100 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.72:36100 
distributed.scheduler - INFO - Register tcp://10.143.7.68:41915 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.68:41915 
distributed.scheduler - INFO - Receive client connection: 5d1dab2a-914e-11e7-8bd1-180373ff6d8b 
distributed.scheduler - INFO - Worker 'tcp://10.143.6.71:46656' failed from closed comm: Stream is clos 
ed 
distributed.scheduler - INFO - Remove worker tcp://10.143.6.71:46656 
distributed.scheduler - INFO - Removed worker tcp://10.143.6.71:46656 
distributed.scheduler - INFO - Worker 'tcp://10.143.6.73:36810' failed from closed comm: Stream is clos 
ed 
distributed.scheduler - INFO - Remove worker tcp://10.143.6.73:36810 
distributed.scheduler - INFO - Removed worker tcp://10.143.6.73:36810 
distributed.scheduler - INFO - Worker 'tcp://10.143.6.72:36100' failed from closed comm: Stream is clos 
ed 
distributed.scheduler - INFO - Remove worker tcp://10.143.6.72:36100 
distributed.scheduler - INFO - Removed worker tcp://10.143.6.72:36100 
distributed.scheduler - INFO - Worker 'tcp://10.143.7.67:45228' failed from closed comm: Stream is clos 
ed 
distributed.scheduler - INFO - Remove worker tcp://10.143.7.67:45228 
distributed.scheduler - INFO - Removed worker tcp://10.143.7.67:45228 
(arlenv) [[email protected] performance]$ 

這是否會引起任何可能的原因?

感謝, 添

回答

0

DASK如何選擇任務分配給工人是複雜的,並考慮到像負載均衡,數據傳輸,資源約束等許多問題。它可以很難推論哪裏事情最終會沒有一個具體而簡單的例子。

你可以嘗試的一件事就是一次提交所有的計算,這可以讓調度程序做出稍微更明智的決定,而不是一次只看一個事物。

所以,你可以嘗試更換這樣的代碼:

futures = [client.compute(delayed_value) for delayed_value in L] 
wait(futures) 

與這樣的代碼

futures = client.compute(L) 
wait(futures) 

但老實說,我只給這個解決你的問題有30%的機會。如果不深入分析問題,很難知道發生了什麼。如果你可以提供一個簡單的可重複代碼示例,那麼這將是最好的。

+0

爲了讓分配工作,我必須從名稱翻譯到IP地址。 –

+0

我的問題undersubscribes內存,這樣即使所有的數據被移動到一個節點(從16),它仍然只有25%滿。因此,我從8個節點/ 16個線程和幾個百分比的內存開始,將其縮小到單個工作上。我需要更多地考慮如何着手。 –

+0

我認爲這與內存大小無關。看看調度程序日誌。 –