0
這是隨着時間的推移失去工人的證明。這是一個後續到Dask隨着時間的推移失去工人
Distributing graphs to across cluster nodes
這個例子是不是相當小,但它確實給我們的典型工作模式的想法。睡眠是導致問題的必要條件。這發生在完整的應用程序中,因爲需要根據以前的結果生成大圖。
當我在集羣上運行此,我用DASK-SSH超過8個節點得到32名工人:
dask-ssh --nprocs 4 --nthreads 1 --scheduler-port 8786 --log-directory `pwd` --hostfile hostfile.$JOBID &
sleep 10
應該在少於約10分鐘的全套工人的運行。我按照診斷屏幕上的執行。在事件中,我看到工作人員正在添加,但有時候我並不總是看到一些工作人員被移除,通常只留下那些託管調度程序的節點上的工作人員。
""" Test to illustrate losing workers under dask/distributed.
This mimics the overall structure and workload of our processing.
Tim Cornwell 9 Sept 2017
[email protected]
"""
import numpy
from dask import delayed
from distributed import Client
# Make some randomly located points on 2D plane
def init_sparse(n, margin=0.1):
numpy.random.seed(8753193)
return numpy.array([numpy.random.uniform(margin, 1.0 - margin, n),
numpy.random.uniform(margin, 1.0 - margin, n)]).reshape([n, 2])
# Put the points onto a grid and FFT, skip to save time
def grid_data(sparse_data, shape, skip=100):
grid = numpy.zeros(shape, dtype='complex')
loc = numpy.round(shape * sparse_data).astype('int')
for i in range(0, sparse_data.shape[0], skip):
grid[loc[i,:]] = 1.0
return numpy.fft.fft(grid).real
# Accumulate all psfs into one psf
def accumulate(psf_list):
lpsf = 0.0 * psf_list[0]
for p in psf_list:
lpsf += p
return lpsf
if __name__ == '__main__':
import sys
import time
start=time.time()
# Process nchunks each of length len_chunk 2d points, making a psf of size shape
len_chunk = int(1e6)
nchunks = 16
shape=[512, 512]
skip = 100
# We pass in the scheduler from the invoking script
if len(sys.argv) > 1:
scheduler = sys.argv[1]
client = Client(scheduler)
else:
client = Client()
print("On initialisation", client)
sparse_graph = [delayed(init_sparse)(len_chunk) for i in range(nchunks)]
sparse_graph = client.compute(sparse_graph, sync=True)
print("After first sparse_graph", client)
xfr_graph = [delayed(grid_data)(s, shape=shape, skip=skip) for s in sparse_graph]
xfr = client.compute(xfr_graph, sync=True)
print("After xfr", client)
tsleep = 120.0
print("Sleeping now for %.1f seconds" % tsleep)
time.sleep(tsleep)
print("After sleep", client)
sparse_graph = [delayed(init_sparse)(len_chunk) for i in range(nchunks)]
# sparse_graph = client.compute(sparse_graph, sync=True)
xfr_graph = [delayed(grid_data)(s, shape=shape, skip=skip) for s in sparse_graph]
psf_graph = delayed(accumulate)(xfr_graph)
psf = client.compute(psf_graph, sync=True)
print("*** Successfully reached end in %.1f seconds ***" % (time.time() - start))
print(numpy.max(psf))
print("After psf", client)
client.shutdown()
exit()
Grep'ing爲客戶一個典型的運行表明:
On initialisation <Client: scheduler='tcp://sand-8-17:8786' processes=16 cores=16>
After first sparse_graph <Client: scheduler='tcp://sand-8-17:8786' processes=16 cores=16>
After xfr <Client: scheduler='tcp://sand-8-17:8786' processes=16 cores=16>
After sleep <Client: scheduler='tcp://sand-8-17:8786' processes=4 cores=4>
After psf <Client: scheduler='tcp://sand-8-17:8786' processes=4 cores=4>
感謝, 添
我們仍在爲此而苦苦掙扎。我們看到'[worker openhpc-compute-1]:distributed.core - 警告 - 事件循環在1.12s時沒有響應。這通常是由長時間運行的GIL保持功能或移動大塊數據引起的。這可能會導致超時和不穩定。 [scheduler openhpc-compute-0:8786]:distributed.scheduler - INFO - Worker'tcp://10.60.253.19:39025'從關閉通訊失敗::TimeoutError:[Errno 110]連接超時 [scheduler openhpc-compute-0:8786]:distributed.scheduler - INFO - 刪除worker tcp://10.60.253.19:39025' –