2017-09-13 41 views
0

這是隨着時間的推移失去工人的證明。這是一個後續到Dask隨着時間的推移失去工人

Distributing graphs to across cluster nodes

這個例子是不是相當小,但它確實給我們的典型工作模式的想法。睡眠是導致問題的必要條件。這發生在完整的應用程序中,因爲需要根據以前的結果生成大圖。

當我在集羣上運行此,我用DASK-SSH超過8個節點得到32名工人:

dask-ssh --nprocs 4 --nthreads 1 --scheduler-port 8786 --log-directory `pwd` --hostfile hostfile.$JOBID & 
sleep 10 

應該在少於約10分鐘的全套工人的運行。我按照診斷屏幕上的執行。在事件中,我看到工作人員正在添加,但有時候我並不總是看到一些工作人員被移除,通常只留下那些託管調度程序的節點上的工作人員。

""" Test to illustrate losing workers under dask/distributed. 

This mimics the overall structure and workload of our processing. 

Tim Cornwell 9 Sept 2017 
[email protected] 
""" 
import numpy 
from dask import delayed 
from distributed import Client 


# Make some randomly located points on 2D plane 
def init_sparse(n, margin=0.1): 
    numpy.random.seed(8753193) 
    return numpy.array([numpy.random.uniform(margin, 1.0 - margin, n), 
         numpy.random.uniform(margin, 1.0 - margin, n)]).reshape([n, 2]) 


# Put the points onto a grid and FFT, skip to save time 
def grid_data(sparse_data, shape, skip=100): 
    grid = numpy.zeros(shape, dtype='complex') 
    loc = numpy.round(shape * sparse_data).astype('int') 
    for i in range(0, sparse_data.shape[0], skip): 
     grid[loc[i,:]] = 1.0 
    return numpy.fft.fft(grid).real 

# Accumulate all psfs into one psf 
def accumulate(psf_list): 
    lpsf = 0.0 * psf_list[0] 
    for p in psf_list: 
     lpsf += p 
    return lpsf 


if __name__ == '__main__': 
    import sys 
    import time 
    start=time.time() 

    # Process nchunks each of length len_chunk 2d points, making a psf of size shape 
    len_chunk = int(1e6) 
    nchunks = 16 
    shape=[512, 512] 
    skip = 100 

    # We pass in the scheduler from the invoking script 
    if len(sys.argv) > 1: 
     scheduler = sys.argv[1] 
     client = Client(scheduler) 
    else: 
     client = Client() 

    print("On initialisation", client) 

    sparse_graph = [delayed(init_sparse)(len_chunk) for i in range(nchunks)] 
    sparse_graph = client.compute(sparse_graph, sync=True) 
    print("After first sparse_graph", client) 

    xfr_graph = [delayed(grid_data)(s, shape=shape, skip=skip) for s in sparse_graph] 
    xfr = client.compute(xfr_graph, sync=True) 
    print("After xfr", client) 

    tsleep = 120.0 
    print("Sleeping now for %.1f seconds" % tsleep) 
    time.sleep(tsleep) 
    print("After sleep", client) 

    sparse_graph = [delayed(init_sparse)(len_chunk) for i in range(nchunks)] 
    # sparse_graph = client.compute(sparse_graph, sync=True) 
    xfr_graph = [delayed(grid_data)(s, shape=shape, skip=skip) for s in sparse_graph] 
    psf_graph = delayed(accumulate)(xfr_graph) 
    psf = client.compute(psf_graph, sync=True) 

    print("*** Successfully reached end in %.1f seconds ***" % (time.time() - start)) 
    print(numpy.max(psf)) 
    print("After psf", client) 

    client.shutdown() 
    exit() 

Grep'ing爲客戶一個典型的運行表明:

On initialisation <Client: scheduler='tcp://sand-8-17:8786' processes=16 cores=16> 
After first sparse_graph <Client: scheduler='tcp://sand-8-17:8786' processes=16 cores=16> 
After xfr <Client: scheduler='tcp://sand-8-17:8786' processes=16 cores=16> 
After sleep <Client: scheduler='tcp://sand-8-17:8786' processes=4 cores=4> 
After psf <Client: scheduler='tcp://sand-8-17:8786' processes=4 cores=4> 

感謝, 添

+0

我們仍在爲此而苦苦掙扎。我們看到'[worker openhpc-compute-1]:distributed.core - 警告 - 事件循環在1.12s時沒有響應。這通常是由長時間運行的GIL保持功能或移動大塊數據引起的。這可能會導致超時和不穩定。 [scheduler openhpc-compute-0:8786]:distributed.scheduler - INFO - Worker'tcp://10.60.253.19:39025'從關閉通訊失敗::TimeoutError:[Errno 110]連接超時 [scheduler openhpc-compute-0:8786]:distributed.scheduler - INFO - 刪除worker tcp://10.60.253.19:39025' –

回答

0

這不是很清楚爲什麼這個工作,但它沒有。我們正在使用dask-ssh,但需要更多地控制工人的創建。最終我們解決了:

scheduler=$(head -1 hostfile.$JOBID) 
hostIndex=0 
for host in `cat hostfile.$JOBID`; do 
    echo "Working on $host ...." 
    if [ "$hostIndex" = "0" ]; then 
     echo "run dask-scheduler" 
     ssh $host dask-scheduler --port=8786 & 
     sleep 5 
    fi 
    echo "run dask-worker" 
    ssh $host dask-worker --host ${host} --nprocs NUMBER_PROCS_PER_NODE \ 
    --nthreads NUMBER_THREADS \ 
    --memory-limit 0.25 --local-directory /tmp $scheduler:8786 & 
    sleep 1 
    hostIndex="1" 
done 
echo "Scheduler and workers now running" 
相關問題