tensorflow: SyncReplicasOptimizer race condition strange behavior?
It seems there is a strange race condition in SyncReplicasOptimizer leading to strange behaviour. I include below an example code to reproduce what seems to be a bug (hopefully in my code) as well as the commands to reproduce it (pretty much the same code as in mnist_replica.py).
I am trying to implement synchronized SGD using SyncReplicasOptimizer, I also used the queue trick to make the parameter server stop gracefully when all workers are done. I have 4 workers and 1 parameter server. Worker 0 is the chief worker.
Please bear with me for the long explanation of the different issues (they depend on the order in which processes are launched)
**** First kind of issue ****
launch the processes in this order
python test.py --job_name ps
python test.py --job_name worker --taks_index 0
python test.py --job_name worker --taks_index 1
python test.py --job_name worker --taks_index 2
python test.py --job_name worker --taks_index 3
The last worker throws the following error :
I tensorflow/core/distributed_runtime/master_session.cc:909] DeregisterGraph error: Unavailable: {"created":"@1488366991.043859719","description":"OS Error","errno":104,"file":"external/grpc/src/core/lib/iomgr/tcp_posix.c","file_line":229,"grpc_status":14,"os_error":"Connection reset by peer","syscall":"recvmsg"}
and quits, and it happens also that it hangs (not realising that the variable epoch is greater than 4, triggering the break from the training loop, and the enqueue operation to let the ps stop gracefully).
It also happen that all is fine, and the execution terminates without any errors.
**** Second kind of issue ****
launch the processes in this order
python test.py --job_name ps
python test.py --job_name worker --taks_index 3
python test.py --job_name worker --taks_index 2
python test.py --job_name worker --taks_index 1
python test.py --job_name worker --taks_index 0
The chief here being launched at last.
Strangely, the chief completes the loop and quits ( I thought with SyncReplicasOptimizer it had to wait for the other workers to complete each step).
As for the other workers, I had all sort of results when doing the same experiment many times
-
Some workers simply hang and do not execute a single step in the
while truetraining loop -
Some execute some steps, then simply hang, apparently they lose contact with the chief, and do not realise that the variable
epochis greater than 4, triggering the `break from the training loop.
Thank you for help with this issue.
Below is the code of test.py
import os
import shutil
import tempfile
import numpy as np
import pandas as pd
import argparse
from keras.models import Sequential
from keras.layers.core import Dense
from keras.regularizers import l2
import tensorflow as tf
import keras
nb_samples = 50
nb_features = 5
X_train = np.random.randn(nb_samples * nb_features).reshape((nb_samples, nb_features))
Y_train = np.random.randn(nb_samples).reshape((nb_samples, 1))
def build_keras_model(input_dim):
hidden_dim = 10
model = Sequential()
model.add(Dense(input_dim = input_dim,
output_dim=hidden_dim,
activation='tanh'
))
model.add(Dense(output_dim=1, activation='linear'))
model.compile(loss='mse', optimizer='adam')
return model
################################################
# DISTRIBUTE
################################################
parser = argparse.ArgumentParser(description='tensorflow')
parser.add_argument('--job_name', dest='job_name')
parser.add_argument('--task_index', dest='task_index', default=0)
args = parser.parse_args()
ps_hosts = ['localhost:2222']
worker_hosts = ['localhost:2223', 'localhost:2224', 'localhost:2225', 'localhost:2226']
job_name = args.job_name
task_index = int(args.task_index)
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server = tf.train.Server(cluster,
job_name=job_name,
task_index=task_index,
config=tf.ConfigProto(log_device_placement=True,
inter_op_parallelism_threads=1,
intra_op_parallelism_threads=1))
if job_name =='ps':
with tf.device("/job:ps/task:0"):
queue = tf.FIFOQueue(len(worker_hosts), tf.int32, shared_name="done_queue")
sess = tf.Session(server.target)
# wait until all workers are done
for i in range(len(worker_hosts)):
sess.run(queue.dequeue())
else:
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):
keras.backend.set_learning_phase(1)
keras.backend.manual_variable_initialization(True)
model = build_keras_model(nb_features)
preds = model.output
targets = tf.placeholder(tf.float32, [None, 1])
total_loss = tf.reduce_mean(
keras.objectives.mean_squared_error(targets, preds))
global_step = tf.Variable(0, name="global_step", trainable=False)
# For early stopping management
epoch = tf.Variable(0, name="epoch", trainable=False)
inc_epoch_op = tf.assign_add(epoch, 1)
is_chief=(task_index == 0)
opt = tf.train.AdamOptimizer()
num_workers = len(worker_hosts)
replicas_to_aggregate = num_workers
opt = tf.train.SyncReplicasOptimizer(
opt,
replicas_to_aggregate=replicas_to_aggregate,
total_num_replicas=num_workers,
name="sync_replicas")
train_op = opt.minimize(total_loss, global_step=global_step)
local_init_op = opt.local_step_init_op
if is_chief:
local_init_op = opt.chief_init_op
ready_for_local_init_op = opt.ready_for_local_init_op
# Initial token and chief queue runners required by the sync_replicas mode
chief_queue_runner = opt.get_chief_queue_runner()
sync_init_op = opt.get_init_tokens_op()
init_op = tf.global_variables_initializer()
with tf.device("/job:ps/task:0"):
queue = tf.FIFOQueue(len(worker_hosts), tf.int32, shared_name="done_queue")
enqueue_op = queue.enqueue(1)
train_dir = tempfile.mkdtemp(prefix = 'worker_%d' % task_index)
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
local_init_op=local_init_op,
ready_for_local_init_op=ready_for_local_init_op,
recovery_wait_secs=1,
global_step=global_step)
print '######################################### ALL CREATED'
sess = sv.prepare_or_wait_for_session(server.target)
keras.backend.set_session(sess)
print '####### SESSION OK ********'
if is_chief:
sess.run(sync_init_op)
sv.start_queue_runners(sess, [chief_queue_runner])
local_step = 0
while True:
train_feed = {model.input: X_train, targets: Y_train}
_, step = sess.run([train_op, global_step], feed_dict=train_feed)
loss = sess.run(total_loss, feed_dict = train_feed)
if is_chief:
sess.run(inc_epoch_op)
local_step += 1
print '## epoch ', epoch.eval(sess)
if epoch.eval(sess) > 4:
print '###################### TRYING TO LEAVE'
break
shutil.rmtree(train_dir)
print '###################### WHILE LOOP LEFT'
sess.run(enqueue_op)
print '## ENQUEUE OP DONE'
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 35 (23 by maintainers)
@npfp Hi, I have encountered the same problem. Have you solved that?
Hi @jmchen-g,
I’m facing the same kind of issues using tensorflow 1.0.0. In a distributed run, for most of the runs the non-chief workers get stuck and log from time to time:
In some runs, they start working and print some logs as expected:
It might be related to the issue you mentionned as I’m also using
tf.train.replica_device_setter("/job:worker/task:%d" % FLAGS.task_index)I wonder if you could give any insights and/or workarounds on the issue?
Many thanks,