tensorflow: Distributed Training Randomly Stops During the Training Process

System information

  • Have I written custom code (as opposed to using a stock example script provided in TensorFlow): yes
  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): Ubuntu 16.04
  • TensorFlow version (use command below): v1.3.0-rc2-20-g0787eee 1.3.0
  • Python version: 3.5.2
  • CUDA/cuDNN version: 6.0
  • GPU model and memory: Tesla K80, 12G

Describe the problem

In my distributed training program, there are one server and two workers, which all run in separately nvidia-docker container. At the beginning, the cluster works just fine, but running normally after several hours, the two workers just stop.

My training process:

  1. I create three nvidia-docker containers, one for parameter server, two for workers
  2. In every container, I run the train_replica function below after defining all necessary parts such as cluster_spec, inference function, data batch and so on.
  3. It works correctly at the beginning
  4. It stops several hours later

Source code / logs

My trainer function:

def train_replica(cluster_spec,
                  get_data_batch,
                  inference_fn,
                  get_init_fn,
                  get_learning_rate,
                  get_optimizer,
                  get_train_variables,
                  replica_param,
                  train_param,
                  ):
    job_name = replica_param['job_name']
    task_index = replica_param['task_index']
    sync_replicas = train_param['sync_replicas']
    log_dir = train_param['log_dir']
    assert job_name in ['ps', 'worker']
    server = tf.train.Server(cluster_spec, job_name=job_name,
                             task_index=task_index, config=get_ps_session_config())
    if job_name == 'ps':
        server.join()
    else:
        is_chief = (task_index == 0)
        device_setter = tf.train.replica_device_setter(cluster=cluster_spec)
        with tf.Graph().as_default():
            with tf.device(device_setter):
                global_step = create_global_step()
                learning_rate = get_learning_rate(global_step)
                data_batch = get_data_batch()
                _ = inference_fn(data_batch)
                total_loss, task_loss = get_losses()
                optimizer = get_optimizer(learning_rate)
                if sync_replicas:
                    optimizer = tf.train.SyncReplicasOptimizer(
                        opt=optimizer,
                        replicas_to_aggregate=cluster_spec.num_tasks('worker'),
                        total_num_replicas=cluster_spec.num_tasks('worker'),
                        name='sync_replica_optimizer'
                    )
                train_op = slim.learning.create_train_op(
                    total_loss=total_loss,
                    optimizer=optimizer,
                    global_step=global_step,
                    variables_to_train=get_train_variables(),
                    clip_gradient_norm=train_param['clip_norm'],
                    gradient_multipliers=train_param['gradient_multipliers'],
                )
                init_fn = get_init_fn() if get_init_fn is not None else None
                scaffold = tf.train.Scaffold(
                    init_op=tf.global_variables_initializer())
                scaffold._init_fn = init_fn
                hooks = [tf.train.StopAtStepHook(train_param['train_steps'])]
                if sync_replicas is True:
                    hooks.append(optimizer.make_session_run_hook(is_chief))
                chief_only_hooks = [tf.train.LoggingTensorHook([total_loss, task_loss], 100)]
                step_ind = 0
                with tf.train.MonitoredTrainingSession(
                        master=server.target,
                        is_chief=is_chief,
                        checkpoint_dir=log_dir,
                        scaffold=scaffold,
                        hooks=hooks,
                        chief_only_hooks=chief_only_hooks,
                        config=get_worker_session_config(task_index)) as session:
                    while not session.should_stop():
                        session.run(train_op)
                        step_ind += 1
                        if step_ind % 1000 == 0:
                            tf.logging.debug('Training Step At {s}'.format(s=step_ind))

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 21 (6 by maintainers)

Most upvoted comments

To anyone facing hangs in the distributed mode, there was a bug in the version of gRPC used in TF 1.4 that would cause servers to stop serving after a (non-deterministic) period of time. This has been fixed at HEAD, and TensorFlow now uses a version of gRPC with the fix. I’d recommend trying to reproduce the problem with the tf-nightly build to rule out that old bug as the source of the problem.

@angersson I ran my experiment again after updating, so far It works fine and no stop happens