2016-10-01 129 views
4

當我們要使用分佈式TensorFlow,我們將使用關閉服務器TensorFlow

tf.train.Server.join() 

然而列表服務器,我無法找到任何方式關閉服務器,除了查殺處理。對於加入TensorFlow文檔()是

Blocks until the server has shut down. 
This method currently blocks forever. 

這是很困擾我,因爲我想爲計算創造衆多服務器和關閉它們,當一切結束。

有沒有可能的解決方案。

感謝

回答

9

通過使用session.run(dequeue_op)而不是server.join(),您可以根據需要讓參數服務器進程死亡,並讓另一個進程在您希望此進程死亡時將某些內容排入該隊列。

所以對於k參數服務器碎片您可以創建k隊列,具有獨特的shared_name屬性,並從該隊列嘗試dequeue。當您想關閉服務器時,您會將所有隊列和enqueue令牌循環到每個隊列中。這會導致session.run解鎖並且Python進程將運行到最後並退出,從而關閉服務器。

下面是2塊碎片一個獨立的例子摘自: https://gist.github.com/yaroslavvb/82a5b5302449530ca5ff59df520c369e

(多工/多碎片例如,見https://gist.github.com/yaroslavvb/ea1b1bae0a75c4aae593df7eca72d9ca

import subprocess 
import tensorflow as tf 
import time 
import sys 

flags = tf.flags 
flags.DEFINE_string("port1", "12222", "port of worker1") 
flags.DEFINE_string("port2", "12223", "port of worker2") 
flags.DEFINE_string("task", "", "internal use") 
FLAGS = flags.FLAGS 

# setup local cluster from flags 
host = "127.0.0.1:" 
cluster = {"worker": [host+FLAGS.port1, host+FLAGS.port2]} 
clusterspec = tf.train.ClusterSpec(cluster).as_cluster_def() 

if __name__=='__main__': 
    if not FLAGS.task: # start servers and run client 

     # launch distributed service 
     def runcmd(cmd): subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT) 
     runcmd("python %s --task=0"%(sys.argv[0])) 
     runcmd("python %s --task=1"%(sys.argv[0])) 
     time.sleep(1) 

     # bring down distributed service 
     sess = tf.Session("grpc://"+host+FLAGS.port1) 
     queue0 = tf.FIFOQueue(1, tf.int32, shared_name="queue0") 
     queue1 = tf.FIFOQueue(1, tf.int32, shared_name="queue1") 
     with tf.device("/job:worker/task:0"): 
      add_op0 = tf.add(tf.ones(()), tf.ones(())) 
     with tf.device("/job:worker/task:1"): 
      add_op1 = tf.add(tf.ones(()), tf.ones(())) 

     print("Running computation on server 0") 
     print(sess.run(add_op0)) 
     print("Running computation on server 1") 
     print(sess.run(add_op1)) 

     print("Bringing down server 0") 
     sess.run(queue0.enqueue(1)) 
     print("Bringing down server 1") 
     sess.run(queue1.enqueue(1)) 

    else: # Launch TensorFlow server 
    server = tf.train.Server(clusterspec, config=None, 
          job_name="worker", 
          task_index=int(FLAGS.task)) 
    print("Starting server "+FLAGS.task) 
    sess = tf.Session(server.target) 
    queue = tf.FIFOQueue(1, tf.int32, shared_name="queue"+FLAGS.task) 
    sess.run(queue.dequeue()) 
    print("Terminating server"+FLAGS.task) 
+0

感謝您的回答。它工作得很好。但是當我嘗試使用'tf.Supervisor'(TF網站上的那個)來適應這個例子時,我遇到了一些問題。一旦我實例化了一個「supervisor」對象,該圖將被「終結」。因此我們不能在訓練後排隊。使用兩個圖表可能會工作,但根據[本文](http://stackoverflow.com/a/34249940/4811003),它可能會影響性能。有沒有一個好的解決方案? – fois

+0

'queue0.enqueue(1)'實際上創建一個enqueue操作並修改圖形。你可以改爲'op1 = queue0.enqueue(1); ; sess.run(op1)' –

+0

你說得對。我很愚蠢。 非常感謝。 – fois

3

目前還沒有任何清晰的方式來關閉TensorFlow GRPC服務器。它可能到shut down a gRPC server,但安全地執行此操作需要對所有正在進行的請求和響應緩衝區進行額外的內存管理,這需要大量額外的管道工作(最糟糕的一種:異步共享內存管理...)對於目前爲止沒有人要求—的功能!

在實踐中,您應該可以使用相同的tf.train.Server對象進行許多不同的計算。如果這不適用於您的用例,請隨時撥打open an GitHub issue並告訴我們更多關於您的使用案例。

+0

感謝您的回答。但是如果你在分佈式Tensorflow的文檔中使用這個例子,你會怎麼做?我的意思是,在計算之後,兩臺工作服務器完成,而兩臺參數服務器仍在運行。 – fois

+0

目前,我從命令行中終止參數服務器的進程。我想知道它是否安全? – fois

+0

但是,如果我在完成訓練後不殺死服務器,那麼ps中的變量將影響下一次訓練。是這樣嗎? – fois

0

纔會出現此頁面很常在谷歌,所以我想我會試着改進Yaroslav's answer,我希望對於那些剛剛進入分佈式Tensorflow的人來說,我希望這是一個更明確的答案。

import tensorflow as tf 
import threading 

def main(job_name, task): 
    cluster = tf.train.ClusterSpec({ 
     'ps': ['localhost:22222', 'localhost:22223'], 
     'worker': ['localhost: 22224','localhost: 22225','localhost: 22226'] 
    }) 

    server = tf.train.Server(cluster, job_name=job_name, task_index=task) 

    if job_name == 'ps': 
     # create a shared queue on the parameter server which is visible on /job:ps/task:%d 
     with tf.device('/job:ps/task:%d' % task): 
      queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % task) 

     # wait for the queue to be filled 
     with tf.Session(server.target) as sess: 
      for i in range(cluster.num_tasks('worker')): 
       sess.run(queue.dequeue()) 
       print('ps:%d received "done" from worker:%d' % (task, i)) 
      print('ps:%d quitting' % task) 

    elif job_name == 'worker': 
     queues = [] 
     # create a shared queue on the worker which is visible on /job:ps/task:%d 
     for i in range(cluster.num_tasks('ps')): 
      with tf.device('/job:ps/task:%d' % i): 
       queues.append(tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % i)) 

     # fill the queue 
     with tf.Session(server.target) as sess: 
      for i in range(cluster.num_tasks('ps')): 
       _, size = sess.run([queues[i].enqueue(task), queues[i].size()]) 
       print('Worker:%d sending "done" to ps:%d [elements=%d]' % (task, i, size)) 

if __name__ == '__main__': 
    threads = [ 
     threading.Thread(target=main, args=('ps', 0)), 
     threading.Thread(target=main, args=('ps', 1)), 
     threading.Thread(target=main, args=('worker', 0)), 
     threading.Thread(target=main, args=('worker', 1)), 
     threading.Thread(target=main, args=('worker', 2))] 
    for thread in threads: 
     thread.start() 
    for thread in threads: 
     thread.join() 

這是很簡單的用這個片段替換代碼的工人節在「規範」 Distributed Tensorflow example擴展:

# create a worker that does nothing 
    elif job_name == 'worker': 
     with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:%d' % task, cluster=cluster)): 
      global_step = tf.train.get_or_create_global_step() 
      no_op = tf.no_op() 

     done_ops = [] 
     # create a shared queue on the worker which is visible on /job:ps/task:%d 
     for i in range(cluster.num_tasks('ps')): 
      with tf.device('/job:ps/task:%d' % i): 
       done_queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue' + str(i)) 
       done_ops.append(done_queue.enqueue(task)) 

     hooks=[tf.train.StopAtStepHook(last_step=1), 
       tf.train.FinalOpsHook([done_ops])] 

     with tf.train.MonitoredTrainingSession(master=server.target, 
               is_chief=(task == 0), 
               hooks=hooks) as sess: 
      sess.run([no_op]) 

注意,MonitoredTrainingSession版本似乎是在連接所有慢得多的工人在一起。