2016-08-08 50 views
1

我已經在單個機器上實現了帶有tensorflow的變分自動編碼器。現在我正試圖在我的集羣上運行它,提供tensorflow的分佈式機制。但是下面的問題困擾了我好幾天。使用InvalidArgumentError運行分佈式Tensorflow:您必須爲dtype float提供佔位符張量'佔位符'的值

Traceback (most recent call last): 
    File "/home/yama/mfs/ZhuSuan/examples/vae.py", line 265, in <module> 
    print('>> Test log likelihood = {}'.format(np.mean(test_lls))) 
    File "/usr/lib/python2.7/contextlib.py", line 35, in __exit__ 
    self.gen.throw(type, value, traceback) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py", line 942, in managed_session 
    self.stop(close_summary_writer=close_summary_writer) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py", line 768, in stop 
    stop_grace_period_secs=self._stop_grace_secs) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/training/coordinator.py", line 322, in join 
    six.reraise(*self._exc_info_to_raise) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/training/coordinator.py", line 267, in stop_on_exception 
    yield 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/training/coordinator.py", line 411, in run 
    self.run_loop() 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py", line 972, in run_loop 
    self._sv.global_step]) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 372, in run 
    run_metadata_ptr) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 636, in _run 
    feed_dict_string, options, run_metadata) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 708, in _do_run 
    target_list, options, run_metadata) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 728, in _do_call 
    raise type(e)(node_def, op, message) 
tensorflow.python.framework.errors.InvalidArgumentError: You must feed a value for placeholder tensor 'Placeholder' with dtype float 
    [[Node: Placeholder = Placeholder[dtype=DT_FLOAT, shape=[], _device="/job:worker/replica:0/task:0/gpu:0"]()]] 
    [[Node: model_1/fully_connected_10/Relu_G88 = _Recv[client_terminated=false, recv_device="/job:worker/replica:0/task:0/cpu:0", send_device="/job:worker/replica:0/task:0/gpu:0", send_device_incarnation=3964479821165574552, tensor_name="edge_694_model_1/fully_connected_10/Relu", tensor_type=DT_FLOAT, _device="/job:worker/replica:0/task:0/cpu:0"]()]] 
Caused by op u'Placeholder', defined at: 
    File "/home/yama/mfs/ZhuSuan/examples/vae.py", line 201, in <module> 
    x = tf.placeholder(tf.float32, shape=(None, x_train.shape[1])) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/ops/array_ops.py", line 895, in placeholder 
    name=name) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/ops/gen_array_ops.py", line 1238, in _placeholder 
    name=name) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/ops/op_def_library.py", line 704, in apply_op 
    op_def=op_def) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 2260, in create_op 
    original_op=self._default_original_op, op_def=op_def) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 1230, in __init__ 
    self._traceback = _extract_stack() 

這裏是我的代碼,我只是貼的主要功能簡單:

if __name__ == "__main__": 
    tf.set_random_seed(1234) 

    # Load MNIST 
    data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 
          'data', 'mnist.pkl.gz') 
    x_train, t_train, x_valid, t_valid, x_test, t_test = \ 
     dataset.load_mnist_realval(data_path) 
    x_train = np.vstack([x_train, x_valid]) 
    np.random.seed(1234) 
    x_test = np.random.binomial(1, x_test, size=x_test.shape).astype('float32') 

    # Define hyper-parametere 
    n_z = 40 

    # Define training/evaluation parameters 
    lb_samples = 1 
    ll_samples = 5000 
    epoches = 10 
    batch_size = 100 
    test_batch_size = 100 
    iters = x_train.shape[0] // batch_size 
    test_iters = x_test.shape[0] // test_batch_size 
    test_freq = 10 

    ps_hosts = FLAGS.ps_hosts.split(",") 
    worker_hosts = FLAGS.worker_hosts.split(",") 

    # Create a cluster from the parameter server and worker hosts. 
    clusterSpec = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) 

    print("Create and start a server for the local task.") 
    # Create and start a server for the local task. 
    server = tf.train.Server(clusterSpec, 
          job_name=FLAGS.job_name, 
          task_index=FLAGS.task_index) 

    print("Start ps and worker server") 
    if FLAGS.job_name == "ps": 
     server.join() 
    elif FLAGS.job_name == "worker": 
     #set distributed device 
     with tf.device(tf.train.replica_device_setter(
      worker_device="/job:worker/task:%d" % FLAGS.task_index, 
      cluster=clusterSpec)): 

      print("Build the training computation graph") 
      # Build the training computation graph 
      x = tf.placeholder(tf.float32, shape=(None, x_train.shape[1])) 
      optimizer = tf.train.AdamOptimizer(learning_rate=0.001, epsilon=1e-4) 
      with tf.variable_scope("model") as scope: 
       with pt.defaults_scope(phase=pt.Phase.train): 
        train_model = M1(n_z, x_train.shape[1]) 
        train_vz_mean, train_vz_logstd = q_net(x, n_z) 
        train_variational = ReparameterizedNormal(
         train_vz_mean, train_vz_logstd) 
        grads, lower_bound = advi(
         train_model, x, train_variational, lb_samples, optimizer) 
        infer = optimizer.apply_gradients(grads) 

      print("Build the evaluation computation graph") 
      # Build the evaluation computation graph 
      with tf.variable_scope("model", reuse=True) as scope: 
       with pt.defaults_scope(phase=pt.Phase.test): 
        eval_model = M1(n_z, x_train.shape[1]) 
        eval_vz_mean, eval_vz_logstd = q_net(x, n_z) 
        eval_variational = ReparameterizedNormal(
         eval_vz_mean, eval_vz_logstd) 
        eval_lower_bound = is_loglikelihood(
         eval_model, x, eval_variational, lb_samples) 
        eval_log_likelihood = is_loglikelihood(
         eval_model, x, eval_variational, ll_samples) 

      global_step = tf.Variable(0) 
      saver = tf.train.Saver() 
      summary_op = tf.merge_all_summaries() 
      init_op = tf.initialize_all_variables() 

     # Create a "supervisor", which oversees the training process. 
     sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), 
           logdir=LogDir, 
           init_op=init_op, 
           summary_op=summary_op, 
           saver=saver, 
           global_step=global_step, 
           save_model_secs=600) 
     # Run the inference 
     with sv.managed_session(server.target) as sess: 
      epoch = 0 
      while not sv.should_stop() and epoch < epoches: 
      #for epoch in range(1, epoches + 1): 
       np.random.shuffle(x_train) 
       lbs = [] 
       for t in range(iters): 
        x_batch = x_train[t * batch_size:(t + 1) * batch_size] 
        x_batch = np.random.binomial(n=1, p=x_batch, size=x_batch.shape).astype('float32') 
        _, lb = sess.run([infer, lower_bound], feed_dict={x: x_batch}) 
        lbs.append(lb) 
       if epoch % test_freq == 0: 
        test_lbs = [] 
        test_lls = [] 
        for t in range(test_iters): 
         test_x_batch = x_test[ 
          t * test_batch_size: (t + 1) * test_batch_size] 
         test_lb, test_ll = sess.run(
          [eval_lower_bound, eval_log_likelihood], 
          feed_dict={x: test_x_batch} 
         ) 
         test_lbs.append(test_lb) 
         test_lls.append(test_ll) 
        print('>> Test lower bound = {}'.format(np.mean(test_lbs))) 
        print('>> Test log likelihood = {}'.format(np.mean(test_lls))) 
     sv.stop() 

我試圖糾正我的代碼了好幾天,但我所有的努力都失敗了。尋找你的幫助!

回答

4

此異常的最可能的原因是,該操作的一個在後臺運行tf.train.Supervisor依賴於tf.placeholder()x,但沒有足夠的信息來養活值吧。

最可能的罪魁禍首是summary_op = tf.merge_all_summaries(),因爲庫代碼經常總結取決於訓練數據的值。爲了防止在後臺收集彙總的主管,傳summary_op=Nonetf.train.Supervisor構造:

 # Create a "supervisor", which oversees the training process. 
     sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), 
           logdir=LogDir, 
           init_op=init_op, 
           summary_op=None, 
           saver=saver, 
           global_step=global_step, 
           save_model_secs=600) 

這樣做了以後,你將需要另作安排,收集彙總。最簡單的方法是定期傳遞summary_opsess.run(),然後將結果傳遞給sv.summary_computed()

+0

嗨,mrry。我以多種方式嘗試了您的建議。 – sproblvem

+0

對不起,最後的評論不完整。我試過你的解決方案,錯誤依然存在。如果程序調用sess.run(summary_op)和sv.summary_computed()。錯誤日誌仍然提醒我「您必須爲佔位符張量提供一個值」。或者,如果我只是設置summary_op = None而沒有定期運行sess.run(summary_op),程序將被卡住。還有什麼建議嗎?謝謝你,並尋找你的答覆。 – sproblvem

0

我有同樣的確切問題。繼mrry的建議我能夠工作了這一點:

  1. 通過設置summary_op =無(如mrry建議)
  2. 創建我自己的summary_op並把它傳遞給sess.run禁用匯總記錄在系統管理員()以及剩下的評估對象。堅持所得到的摘要,假設它被稱爲'my_summary'。
  3. 創建我自己的彙總作家。與 'my_summary',如調用它:summary_writer.add_summary(概要,epoch_count)

爲了澄清,我沒有使用mrry的建議做 sess.run(summary_op)和sv.summary_computed(),而是將summary_op與其他操作一起運行,然後自己寫出摘要。你也可能想要寫一篇關於擔任首席的總結性文章。

因此,基本上,您需要完全繞過主管的總結撰寫服務。看起來像Supervisor的令人驚訝的限制/缺陷,因爲想要記錄依賴於輸入的東西(居住在佔位符中)並不罕見。例如在我的網絡(自動編碼器)中,成本取決於輸入。

0

碰到類似的事情。該負責人正在與上述錯誤信息。但是,由於我使用的是MonitoredTrainingSession而不是自制的Supervisor,因此我可以通過禁用缺省摘要來解決問題。要禁用,您必須向MonitoredTrainingSession的構造函數提供

save_summaries_secs=None, 
save_summaries_steps=None, 

。之後,一切都變得順利! Code on Github

相關問題