tensorflow: Distributed tensorflow worker hangs at TF_CloseSession() when using MonitoredTrainingSession

System information

  • Have I written custom code (as opposed to using a stock example script provided in TensorFlow): yes
  • Mobile device (e.g. iPhone 8, Pixel 2, Samsung Galaxy) if the issue happens on mobile device: N/A
  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): RHEL7 and also Mac OS X 10.13.6
  • TensorFlow installed from (source or binary): binary (pip install)
  • TensorFlow version (use command below): 1.9.0+
  • Python version: 2.7 (RHEL7), 3.6 (Mac)
  • Bazel version (if compiling from source): N/A
  • GCC/Compiler version (if compiling from source): N/A
  • CUDA/cuDNN version: N/A
  • GPU model and memory: N/A
  • Exact command to reproduce: See below

Describe the problem

When using MonitoredTrainingSession in TensorFlow version 1.9 or higher, I’m seeing the following deadlock/hang (as reported by the hanging-threads pip package) when the context manager exits. Note: I do not see this hang for versions 1.8 or earlier. Also, note that this does not occur if using the older tf.train.Supervisor API.

----------     Thread 140682110711616 hangs       ----------
	File "trainer.py", line 102, in <module>
		tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
	File "/usr/lib/python2.7/site-packages/tensorflow/python/platform/app.py", line 125, in run
		_sys.exit(main(argv))
	File "trainer.py", line 67, in main
		print("step: {}".format(step))
	File "/usr/lib/python2.7/site-packages/tensorflow/python/training/monitored_session.py", line 689, in __exit__
		self._close_internal(exception_type)
	File "/usr/lib/python2.7/site-packages/tensorflow/python/training/monitored_session.py", line 726, in _close_internal
		self._sess.close()
	File "/usr/lib/python2.7/site-packages/tensorflow/python/training/monitored_session.py", line 974, in close
		self._sess.close()
	File "/usr/lib/python2.7/site-packages/tensorflow/python/training/monitored_session.py", line 1121, in close
		_WrappedSession.close(self)
	File "/usr/lib/python2.7/site-packages/tensorflow/python/training/monitored_session.py", line 974, in close
		self._sess.close()
	File "/usr/lib/python2.7/site-packages/tensorflow/python/training/monitored_session.py", line 974, in close
		self._sess.close()
	File "/usr/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 690, in close
		tf_session.TF_CloseSession(self._session)

The code that generates this is based on the Distributed Tensorflow documentation (with a trivial/dummy model). I start one PS node and two worker nodes on a single box as follows:

rm -rf /tmp/train_logs; \
python trainer.py \
     --ps_hosts=localhost:2222 \
     --worker_hosts=localhost:2223,localhost:2224 \
     --job_name=ps --task_index=0

python trainer.py \
     --ps_hosts=localhost:2222 \
     --worker_hosts=localhost:2223,localhost:2224 \
     --job_name=worker --task_index=0

python trainer.py \
     --ps_hosts=localhost:2222 \
     --worker_hosts=localhost:2223,localhost:2224 \
     --job_name=worker --task_index=1

I’ve been able to reproduce this quite consistently on:

  • Mac 10.13.6, Python 3.6, TensorFlow 1.10
  • RHEL7, Python2.7, TensorFlow 1.9

And the symptom goes away when switching to 1.8 or earlier.

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 19 (11 by maintainers)

Commits related to this issue

Most upvoted comments

[Update: On closer inspection, the issue I encountered seems slightly different from the one reported in this issue, but I’ll leave my post below in case it’s helpful to someone.]

I had a similar issue doing asynchronous training with MNIST where once the first worker finishes, the rest of the workers would hang. I managed to solve this by disabling communication between workers (each worker only needs to talk to the ps): https://github.com/linkedin/TonY/pull/120/files

config_proto = tf.ConfigProto(device_filters = ['/job:ps', '/job:worker/task:%d' % task_index])
...
with tf.train.MonitoredTrainingSession(master=server.target,
                                       is_chief=(task_index == 0),
                                       checkpoint_dir=FLAGS.working_dir,
                                       hooks=hooks,
                                       config=config_proto) as sess:
...

@erwa thank you, that works fine

I have a similar problem when two workers didn’t close at the same time.

In the following code, I deliberately delayed one session from closing, which could cause the program failing to terminate:

import os
import subprocess
import sys
import time

_CLUSTER_SPEC = {
    'ps': ['localhost:2222'],
    'worker': ['localhost:2223', 'localhost:2224']
}


def main_task(job_name, task_index):
    import tensorflow as tf

    server = tf.train.Server(server_or_cluster_def=_CLUSTER_SPEC, job_name=job_name, task_index=task_index)

    if job_name == 'ps':
        server.join()
    elif job_name == 'worker':
        with tf.device(tf.train.replica_device_setter(cluster=_CLUSTER_SPEC)):
            train_op = tf.assign_add(tf.train.get_or_create_global_step(), 1)

        print('[Worker {}] Creating session'.format(task_index), flush=True)

        with tf.train.MonitoredTrainingSession(master=server.target,
                                               is_chief=task_index == 0,
                                               hooks=[tf.train.StopAtStepHook(last_step=10000)]) as session:
            print('[Worker {}] Session created'.format(task_index), flush=True)

            while not session.should_stop():
                global_step = session.run(train_op)

                if global_step % 1000 == 0:
                    print('[Worker {}] Step = {}'.format(task_index, global_step), flush=True)

            print('[Worker {}] Closing session'.format(task_index), flush=True)

            # Delay the worker deliberately.

            if task_index == 1:
                time.sleep(1)

        print('[Worker {}] Session closed'.format(task_index), flush=True)


def _new_task(process_list, job_name, task_index):
    env = os.environ.copy()

    env['GRPC_VERBOSITY'] = 'DEBUG'

    process = subprocess.Popen(args=[sys.executable, __file__, job_name, str(task_index)], env=env)

    process_list.append(process)

    return process


def main_entry_point():
    created_processes = []

    try:
        processes = {job_name: [_new_task(process_list=created_processes, job_name=job_name, task_index=task_index)
                                for task_index in range(len(task_targets))]
                     for job_name, task_targets in _CLUSTER_SPEC.items()}

        for i, process in enumerate(processes['worker']):
            process.wait()
    finally:
        for process in reversed(created_processes):
            process.terminate()
            process.wait()


def main(args):
    if args:
        job_name, task_index_str = args

        main_task(job_name=job_name, task_index=int(task_index_str))
    else:
        main_entry_point()


if __name__ == '__main__':
    main(sys.argv[1:])

I also have set GRPC_VERBOSITY to DEBUG, and I see worker 1 still tries to contact worker 0 even it is closed.