2017-03-29 163 views
3

我有如下Python代碼test.py,它使用「之間-圖形複製」分佈式Tensorflow:Tensorflow變量不使用初始化之間-圖形複製

import argparse 
import logging 

import tensorflow as tf 

log = logging.getLogger(__name__) 

# Job Names 
PARAMETER_SERVER = "ps" 
WORKER_SERVER = "worker" 

# Cluster Details 
CLUSTER_SPEC = { 
    PARAMETER_SERVER: ["localhost:2222"], 
    WORKER_SERVER: ["localhost:1111", "localhost:1112"]} 


def parse_command_arguments(): 
    """ Set up and parse the command line arguments passed for experiment. """ 
    parser = argparse.ArgumentParser(
     description="Parameters and Arguments for the Test.") 
    parser.add_argument(
     "--job_name", 
     type=str, 
     default="", 
     help="One of 'ps', 'worker'" 
    ) 
    # Flags for defining the tf.train.Server 
    parser.add_argument(
     "--task_index", 
     type=int, 
     default=0, 
     help="Index of task within the job" 
    ) 

    return parser.parse_args() 


def start_server(job_name, task_index): 
    """ Create a server based on a cluster spec. """ 
    cluster = tf.train.ClusterSpec(CLUSTER_SPEC) 
    server = tf.train.Server(
     cluster, job_name=job_name, task_index=task_index) 

    return server, cluster 


def model(): 
    """ Build up a simple estimator model. """ 
    # Build a linear model and predict values 
    W = tf.Variable([.3], tf.float32) 
    b = tf.Variable([-.3], tf.float32) 
    x = tf.placeholder(tf.float32) 
    linear_model = W * x + b 
    y = tf.placeholder(tf.float32) 
    global_step = tf.get_variable('global_step', [], 
            initializer=tf.constant_initializer(0), 
            trainable=False) 

    # Loss sub-graph 
    loss = tf.reduce_sum(tf.square(linear_model - y)) 

    # optimizer 
    optimizer = tf.train.GradientDescentOptimizer(0.01) 
    train = optimizer.minimize(loss, global_step=global_step) 

    init_op = tf.global_variables_initializer() 
    log.info("Variables initialized ...") 

    return W, b, loss, x, y, train, global_step, init_op 


if __name__ == "__main__": 
    # Initializing logging with level "INFO". 
    logging.basicConfig(level=logging.INFO) 

    # Parse arguments from command line. 
    arguments = parse_command_arguments() 
    job_name = arguments.job_name 
    task_index = arguments.task_index 

    # Start a server. 
    server, cluster = start_server(job_name, task_index) 

    if job_name == "ps": 
     server.join() 
    else: 
     with tf.device(tf.train.replica_device_setter(
       worker_device="/job:worker/task:%d" % task_index, 
       cluster=cluster)): 
      W, b, loss, x, y, train, global_step, init_op = model() 
     with tf.train.MonitoredTrainingSession(
       master=server.target, 
       is_chief=(arguments.task_index == 0 and (
          arguments.job_name == 'worker'))) as sess: 
      step = 0 
      # training data 
      x_train = [1, 2, 3, 4] 
      y_train = [0, -1, -2, -3] 
      while not sess.should_stop() and step < 1000: 
       _, step = sess.run(
        [train, global_step], {x: x_train, y: y_train}) 

      # evaluate training accuracy 
      curr_W, curr_b, curr_loss = sess.run(
       [W, b, loss], {x: x_train, y: y_train}) 
      print("W: %s b: %s loss: %s" % (curr_W, curr_b, curr_loss)) 

我跑的代碼與3個不同的處理在一個單一的機器執行下列順序(與的MacPro僅CPU)的:

  1. 參數服務器:$ python test.py --task_index 0 --job_name ps
  2. 工人1:$ python test.py --task_index 0 --job_name worker
  3. 工人2:$ python test.py --task_index 1 --job_name worker

,我發現,對於 「工人2」 的過程中遇到錯誤:

$ python test.py --task_index 1 --job_name worker 
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:197] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2222} 
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:197] Initialize GrpcChannelCache for job worker -> {0 -> localhost:1111, 1 -> localhost:1112} 
I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:211] Started server with target: grpc://localhost:1112 
INFO:__main__:Variables initialized ... 
I tensorflow/core/distributed_runtime/master_session.cc:993] Start master session 9912c75f2921fe13 with config: 

INFO:tensorflow:Waiting for model to be ready. Ready_for_local_init_op: None, ready: Variables not initialized: Variable, Variable_1, global_step 
INFO:tensorflow:Waiting for model to be ready. Ready_for_local_init_op: None, ready: Variables not initialized: Variable, Variable_1, global_step 

和過程 「工人2」 只是凍結在那裏。該錯誤顯示「Worker 2」的Tensorflow變量未成功初始化,所以我想知道在Tensorflow會話或其他地方協調變量初始化方面是否存在缺陷MonitoredTrainingSession,或者我錯過了代碼中的東西。

NOTE: The code was running with Tensorflow 0.12

回答

5

我認爲對於tf.train.MonitoredTrainingSession協調協議,這是 「預期的行爲」。在recent answer中,我解釋了這個協議是如何適應長期運行的培訓工作的,因此工人會在檢查變量是否被初始化之間休眠30秒。

工作者1運行初始化操作和工人2檢查變量之間存在競爭條件,並且如果工人2「勝利」競爭,它會觀察到一些變量未初始化,並且它將進入30秒在再次檢查之前睡覺。

但是,程序中的總計算量非常小,因此在此30秒期間,工作人員1將能夠完成其工作並終止。當工作人員2檢查變量是否被初始化時,它會創建一個新的tf.Session,嘗試連接到其他任務,但工作人員1不再運行,因此您將看到類似這樣的日誌消息(每10秒重複一次或所以):

I tensorflow/core/distributed_runtime/master.cc:193] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0 

當培訓工作大大超過30秒時,這不成問題。

一種解決方法是通過設置「設備過濾器」來消除工人之間的相互依賴關係。因爲在一個典型的圖形之間配置的職工個人不溝通,你可以告訴TensorFlow忽略不存在另一工作人員在會話創建時,使用tf. ConfigProto

# Each worker only needs to contact the PS task(s) and the local worker task. 
config = tf.ConfigProto(device_filters=[ 
    '/job:ps', '/job:worker/task:%d' % arguments.task_index]) 

with tf.train.MonitoredTrainingSession(
    master=server.target, 
    config=config, 
    is_chief=(arguments.task_index == 0 and (
       arguments.job_name == 'worker'))) as sess: 
    # ... 
+0

謝謝!這真的很有幫助! –