tensorflow: non-chief worker stuck in distributed SYNC mode for graph with two optimizers

Seems distributed tensorflow cannot train graph with two optimizers in sync mode (one for local update, the other for ps update)

There are three parts in my graph:

  1. each worker has its own copy of vars as the ps server, but are defined with local variable collections=[tf.GraphKeys.LOCAL_VARIABLES], each worker has its own forward-backward loop based on the local vars and its own optimizer
  2. (local variable - ps variable) as the gradient and apply to ps variable with SyncReplicasOptimizer.apply_gradients
  3. broadcast the ps variable to the local variable

The three parts are run in this way: run subgraph 1 several times, then run subgraph 2 in distributed sync mode to update ps params and then run subgraph 3

Source code / logs

    if args.job_name == 'ps':
        server.join()
    elif args.job_name == 'worker':
        is_chief = (args.task_index == 0)
        num_gpus = len(worker_spec)

        ps_device = '/job:ps/cpu:0'
        worker_device = '/job:worker/task:%d/gpu:0' % args.task_index
        with tf.device(
                tf.train.replica_device_setter(cluster=cluster, ps_device=ps_device, worker_device=worker_device)):
            global_step = tf.Variable(args.start_step, name='global_step', trainable=False)

            print 'building ps params'
            ps_tparams = init_tparams()

            print 'building local params'
            with tf.device(worker_device):
                worker_tparams = init_tparams(is_local=True)  # define variable in collection tf.GraphKeys.LOCAL_VARIABLES

            print 'building graph'

            print '-- local update'
            x, x_mask, y, y_mask, cost = build_graph(worker_tparams, config)
            opt = tf.train.MomentumOptimizer(config.lr, config.mr)
            updates = worker_tparams
            grads = tf.gradients(cost, updates, colocate_gradients_with_ops=True)
            clipped_grads, _ = tf.clip_by_global_norm(grads, config.clip_grads)
            train_op = opt.apply_gradients(zip(clipped_grads, updates))

            print '-- reduce average'
            ps_updates = ps_tparams
            avg_grads = [tf.sub(var, ps_var) for var, ps_var in zip(updates, ps_updates)]
            bopt = tf.train.MomentumOptimizer(config.blr, config.bmr, use_nesterov=True)
            bopt = tf.train.SyncReplicasOptimizerV2(bopt, replicas_to_aggregate=num_gpus, total_num_replicas=num_gpus)
            update_op = bopt.apply_gradients(zip(avg_grads, ps_updates), global_step=global_step)

            print '-- broadcast'
            broadcast_ops = []
            for kk, pp in ps_tparams.items():
                broadcast_ops.append(worker_tparams[kk].assign(pp).op)

            # Others related to sync mode
            chief_queue_runner = bopt.get_chief_queue_runner()
            sync_init_op = bopt.get_init_tokens_op()

            sv = tf.train.Supervisor(
                is_chief=is_chief,
                logdir=config.ckp,
                init_op=tf.global_variables_initializer(),
                local_init_op=tf.local_variables_initializer(),
                global_step=global_step)

            if is_chief:
                print('Worker %d: Initializing session' % args.task_index)
            else:
                print('Worker %d: Waiting for session to be initialized' % args.task_index)

            sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False,
                                         device_filters=['/job:ps', '/job:worker/task:%d' % args.task_index])
            with sv.managed_session(server.target, config=sess_config) as sess:
                print('Worker %d: Session initialization completed' % args.task_index)

                if is_chief:
                    # Chief worker will start the chief queue runner and call the init op
                    sess.run(sync_init_op)
                    sv.start_queue_runners(sess, [chief_queue_runner])

The non-chief works stuck at sv.managed_session and showing below message again and again: I tensorflow/core/distributed_runtime/master_session.cc:993] Start master session a4de1cec7011a62d with config: The code can run successfully when there is no local optimizer.

System information

  • Linux Ubuntu 14.04
  • CUDA 8.0
  • tf 0.12.0-rc1
  • GPU: GeForce GTX 1080

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Reactions: 1
  • Comments: 18 (12 by maintainers)

Most upvoted comments

I don’t think it’s a bug, although without a self-contained example I can’t say for sure.

The most likely cause of sv.managed_session() hanging is because sv.prepare_or_wait_for_session() is blocking inside SessionManager.wait_for_session(). The non-chief workers will block in here until the chief initializes all of the variables (and hence the ready_op returns True). It’s possible that, because you have a slightly unusual variable configuration, the ready_op never returns True. You might need to customize the ready_op (or possibly the init_op) when constructing the tf.train.Supervisor to work for your configuration.