tensorflow: Cache lockfile already exists
Using the train_and_evaluate method from estimator API and cache policy on filesystem, I get an error because the evaluation starts before that all cache is written on the filesystem. So when the train runs the second time, after the evaluation, I get the error because it finds the lock file. If the cache is written before the first evaluation I don’t get the error.
Here a snippet runnable on colab
import tensorflow as tf
import numpy as np
from tensorflow import keras as ks
import itertools
def batch_reshape(a, b):
a.set_shape([None, None, 3])
b.set_shape([None, None, 1])
return (a, b)
def gen(n):
a = np.array(np.random.rand(5, 5, 3).astype(np.float32))
b = np.array(np.random.rand(5, 5, 1).astype(np.float32))
return (a, b)
def my_input_fn_train():
ds = tf.data.Dataset.range(100000)
ds = ds.map(
lambda x: tf.py_func(func=gen, inp=[x], Tout=[tf.float32, tf.float32]))
ds = ds.map(batch_reshape)
ds = ds.cache("/tmp/mycache_train")
ds = ds.repeat(3)
value = ds.make_one_shot_iterator().get_next()
return {"input_rgb": value[0]}, {"softmax": value[1]}
def my_input_fn_eval():
ds = tf.data.Dataset.range(100000)
ds = ds.map(
lambda x: tf.py_func(func=gen, inp=[x], Tout=[tf.float32, tf.float32]))
ds = ds.map(batch_reshape)
ds = ds.cache("/tmp/mycache_eval")
ds = ds.repeat(3)
value = ds.make_one_shot_iterator().get_next()
return {"input_rgb": value[0]}, {"softmax": value[1]}
def main():
tf.logging.set_verbosity(tf.logging.INFO)
input_rgb = ks.layers.Input(shape=(1, 5, 5, 3), name="input_rgb")
x = ks.layers.Dense(1, activation='relu', name="Dense_1")(input_rgb)
x = ks.layers.Dense(1, activation='softmax', name="softmax")(x)
model = ks.models.Model(inputs=[input_rgb], outputs=[x])
model.compile(
loss={'softmax': 'binary_crossentropy'},
optimizer=tf.keras.optimizers.Adam())
estimator = ks.estimator.model_to_estimator(keras_model=model)
train_spec = tf.estimator.TrainSpec(input_fn=my_input_fn_train)
eval_spec = tf.estimator.EvalSpec(
input_fn=my_input_fn_eval, steps=5, throttle_secs=2)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
if __name__ == "__main__":
main()
and here the output
INFO:tensorflow:Using the Keras model from memory.
INFO:tensorflow:Using default config.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpd1r865ry
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpd1r865ry', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fcb48f9ecf8>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after 2 secs (eval_spec.throttle_secs) or training is finished.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpd1r865ry/keras_model.ckpt
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 1 into /tmp/tmpd1r865ry/model.ckpt.
INFO:tensorflow:loss = 9.25762, step = 1
INFO:tensorflow:global_step/sec: 612.893
INFO:tensorflow:loss = 9.348354, step = 101 (0.166 sec)
INFO:tensorflow:global_step/sec: 952.69
INFO:tensorflow:loss = 8.319871, step = 201 (0.108 sec)
INFO:tensorflow:global_step/sec: 838.883
INFO:tensorflow:loss = 7.7207212, step = 301 (0.119 sec)
INFO:tensorflow:global_step/sec: 917.55
INFO:tensorflow:loss = 8.657611, step = 401 (0.110 sec)
INFO:tensorflow:global_step/sec: 866.062
INFO:tensorflow:loss = 8.567427, step = 501 (0.118 sec)
INFO:tensorflow:global_step/sec: 862.789
INFO:tensorflow:loss = 7.4227247, step = 601 (0.110 sec)
INFO:tensorflow:global_step/sec: 847.577
INFO:tensorflow:loss = 6.676866, step = 701 (0.118 sec)
INFO:tensorflow:global_step/sec: 911.827
INFO:tensorflow:loss = 7.4915752, step = 801 (0.112 sec)
INFO:tensorflow:global_step/sec: 837.15
INFO:tensorflow:loss = 7.7332606, step = 901 (0.120 sec)
INFO:tensorflow:global_step/sec: 857.915
INFO:tensorflow:loss = 7.283838, step = 1001 (0.116 sec)
INFO:tensorflow:global_step/sec: 786.144
INFO:tensorflow:loss = 8.61713, step = 1101 (0.127 sec)
INFO:tensorflow:Saving checkpoints for 1150 into /tmp/tmpd1r865ry/model.ckpt.
INFO:tensorflow:Loss for final step: 8.417924.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2018-04-05-15:36:08
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpd1r865ry/model.ckpt-1150
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/5]
INFO:tensorflow:Evaluation [2/5]
INFO:tensorflow:Evaluation [3/5]
INFO:tensorflow:Evaluation [4/5]
INFO:tensorflow:Evaluation [5/5]
INFO:tensorflow:Finished evaluation at 2018-04-05-15:36:08
INFO:tensorflow:Saving dict for global step 1150: global_step = 1150, loss = 8.261229
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpd1r865ry/model.ckpt-1150
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
---------------------------------------------------------------------------
AlreadyExistsError Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _do_call(self, fn, *args)
1360 try:
-> 1361 return fn(*args)
1362 except errors.OpError as e:
/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _run_fn(session, feed_dict, fetch_list, target_list, options, run_metadata)
1339 return tf_session.TF_Run(session, options, feed_dict, fetch_list,
-> 1340 target_list, status, run_metadata)
1341
/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/errors_impl.py in __exit__(self, type_arg, value_arg, traceback_arg)
515 compat.as_text(c_api.TF_Message(self.status.status)),
--> 516 c_api.TF_GetCode(self.status.status))
517 # Delete the underlying status object from memory otherwise it stays alive
AlreadyExistsError: There appears to be a concurrent caching iterator running - cache lockfile already exists ('/tmp/mycache_train.lockfile'). If you are sure no other running TF computations are using this cache prefix, delete the lockfile and re-initialize the iterator. Lockfile contents: Created at: 1522942566
[[Node: IteratorGetNext = IteratorGetNext[output_shapes=[[?,?,3], [?,?,1]], output_types=[DT_FLOAT, DT_FLOAT], _device="/job:localhost/replica:0/task:0/device:CPU:0"](OneShotIterator)]]
During handling of the above exception, another exception occurred:
AlreadyExistsError Traceback (most recent call last)
<ipython-input-5-9d8332f50ddd> in <module>()
66
67 if __name__ == "__main__":
---> 68 main()
<ipython-input-5-9d8332f50ddd> in main()
62 input_fn=my_input_fn_eval, steps=5, throttle_secs=2)
63
---> 64 tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
65
66
/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/training.py in train_and_evaluate(estimator, train_spec, eval_spec)
419 '(with task id 0). Given task id {}'.format(config.task_id))
420
--> 421 executor.run()
422
423
/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/training.py in run(self)
492 config.task_type != run_config_lib.TaskType.EVALUATOR):
493 logging.info('Running training and evaluation locally (non-distributed).')
--> 494 self.run_local()
495 return
496
/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/training.py in run_local(self)
624 input_fn=self._train_spec.input_fn,
625 max_steps=self._train_spec.max_steps,
--> 626 hooks=train_hooks)
627
628 # Final export signal: For any eval result with global_step >= train
/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/estimator.py in train(self, input_fn, hooks, steps, max_steps, saving_listeners)
350
351 saving_listeners = _check_listeners_type(saving_listeners)
--> 352 loss = self._train_model(input_fn, hooks, saving_listeners)
353 logging.info('Loss for final step: %s.', loss)
354 return self
/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/estimator.py in _train_model(self, input_fn, hooks, saving_listeners)
889 loss = None
890 while not mon_sess.should_stop():
--> 891 _, loss = mon_sess.run([estimator_spec.train_op, estimator_spec.loss])
892 return loss
893
/usr/local/lib/python3.6/dist-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)
544 feed_dict=feed_dict,
545 options=options,
--> 546 run_metadata=run_metadata)
547
548 def run_step_fn(self, step_fn):
/usr/local/lib/python3.6/dist-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)
1020 feed_dict=feed_dict,
1021 options=options,
-> 1022 run_metadata=run_metadata)
1023 except _PREEMPTION_ERRORS as e:
1024 logging.info('An error was raised. This may be due to a preemption in '
/usr/local/lib/python3.6/dist-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs)
1111 raise six.reraise(*original_exc_info)
1112 else:
-> 1113 raise six.reraise(*original_exc_info)
1114
1115
/usr/local/lib/python3.6/dist-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
/usr/local/lib/python3.6/dist-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs)
1096 def run(self, *args, **kwargs):
1097 try:
-> 1098 return self._sess.run(*args, **kwargs)
1099 except _PREEMPTION_ERRORS:
1100 raise
/usr/local/lib/python3.6/dist-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)
1168 feed_dict=feed_dict,
1169 options=options,
-> 1170 run_metadata=run_metadata)
1171
1172 for hook in self._hooks:
/usr/local/lib/python3.6/dist-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs)
948
949 def run(self, *args, **kwargs):
--> 950 return self._sess.run(*args, **kwargs)
951
952 def run_step_fn(self, step_fn, raw_session, run_with_hooks):
/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in run(self, fetches, feed_dict, options, run_metadata)
903 try:
904 result = self._run(None, fetches, feed_dict, options_ptr,
--> 905 run_metadata_ptr)
906 if run_metadata:
907 proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)
/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _run(self, handle, fetches, feed_dict, options, run_metadata)
1135 if final_fetches or final_targets or (handle and feed_dict_tensor):
1136 results = self._do_run(handle, final_targets, final_fetches,
-> 1137 feed_dict_tensor, options, run_metadata)
1138 else:
1139 results = []
/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _do_run(self, handle, target_list, fetch_list, feed_dict, options, run_metadata)
1353 if handle is None:
1354 return self._do_call(_run_fn, self._session, feeds, fetches, targets,
-> 1355 options, run_metadata)
1356 else:
1357 return self._do_call(_prun_fn, self._session, handle, feeds, fetches)
/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _do_call(self, fn, *args)
1372 except KeyError:
1373 pass
-> 1374 raise type(e)(node_def, op, message)
1375
1376 def _extend_graph(self):
AlreadyExistsError: There appears to be a concurrent caching iterator running - cache lockfile already exists ('/tmp/mycache_train.lockfile'). If you are sure no other running TF computations are using this cache prefix, delete the lockfile and re-initialize the iterator. Lockfile contents: Created at: 1522942566
[[Node: IteratorGetNext = IteratorGetNext[output_shapes=[[?,?,3], [?,?,1]], output_types=[DT_FLOAT, DT_FLOAT], _device="/job:localhost/replica:0/task:0/device:CPU:0"](OneShotIterator)]]
Caused by op 'IteratorGetNext', defined at:
File "/usr/lib/python3.6/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/usr/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py", line 16, in <module>
app.launch_new_instance()
File "/usr/local/lib/python3.6/dist-packages/traitlets/config/application.py", line 658, in launch_instance
app.start()
File "/usr/local/lib/python3.6/dist-packages/ipykernel/kernelapp.py", line 477, in start
ioloop.IOLoop.instance().start()
File "/usr/local/lib/python3.6/dist-packages/zmq/eventloop/ioloop.py", line 177, in start
super(ZMQIOLoop, self).start()
File "/usr/local/lib/python3.6/dist-packages/tornado/ioloop.py", line 888, in start
handler_func(fd_obj, events)
File "/usr/local/lib/python3.6/dist-packages/tornado/stack_context.py", line 277, in null_wrapper
return fn(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/zmq/eventloop/zmqstream.py", line 440, in _handle_events
self._handle_recv()
File "/usr/local/lib/python3.6/dist-packages/zmq/eventloop/zmqstream.py", line 472, in _handle_recv
self._run_callback(callback, msg)
File "/usr/local/lib/python3.6/dist-packages/zmq/eventloop/zmqstream.py", line 414, in _run_callback
callback(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/tornado/stack_context.py", line 277, in null_wrapper
return fn(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/ipykernel/kernelbase.py", line 283, in dispatcher
return self.dispatch_shell(stream, msg)
File "/usr/local/lib/python3.6/dist-packages/ipykernel/kernelbase.py", line 235, in dispatch_shell
handler(stream, idents, msg)
File "/usr/local/lib/python3.6/dist-packages/ipykernel/kernelbase.py", line 399, in execute_request
user_expressions, allow_stdin)
File "/usr/local/lib/python3.6/dist-packages/ipykernel/ipkernel.py", line 196, in do_execute
res = shell.run_cell(code, store_history=store_history, silent=silent)
File "/usr/local/lib/python3.6/dist-packages/ipykernel/zmqshell.py", line 533, in run_cell
return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2718, in run_cell
interactivity=interactivity, compiler=compiler, result=result)
File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2822, in run_ast_nodes
if self.run_code(code, result):
File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2882, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-5-9d8332f50ddd>", line 68, in <module>
main()
File "<ipython-input-5-9d8332f50ddd>", line 64, in main
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/training.py", line 421, in train_and_evaluate
executor.run()
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/training.py", line 494, in run
self.run_local()
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/training.py", line 626, in run_local
hooks=train_hooks)
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/estimator.py", line 352, in train
loss = self._train_model(input_fn, hooks, saving_listeners)
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/estimator.py", line 809, in _train_model
input_fn, model_fn_lib.ModeKeys.TRAIN))
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/estimator.py", line 668, in _get_features_and_labels_from_input_fn
result = self._call_input_fn(input_fn, mode)
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/estimator.py", line 760, in _call_input_fn
return input_fn(**kwargs)
File "<ipython-input-5-9d8332f50ddd>", line 28, in my_input_fn_train
value = ds.make_one_shot_iterator().get_next()
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/data/ops/iterator_ops.py", line 330, in get_next
name=name)), self._output_types,
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/ops/gen_dataset_ops.py", line 866, in iterator_get_next
output_shapes=output_shapes, name=name)
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/op_def_library.py", line 787, in _apply_op_helper
op_def=op_def)
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/ops.py", line 3271, in create_op
op_def=op_def)
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/ops.py", line 1650, in __init__
self._traceback = self._graph._extract_stack() # pylint: disable=protected-access
AlreadyExistsError (see above for traceback): There appears to be a concurrent caching iterator running - cache lockfile already exists ('/tmp/mycache_train.lockfile'). If you are sure no other running TF computations are using this cache prefix, delete the lockfile and re-initialize the iterator. Lockfile contents: Created at: 1522942566
[[Node: IteratorGetNext = IteratorGetNext[output_shapes=[[?,?,3], [?,?,1]], output_types=[DT_FLOAT, DT_FLOAT], _device="/job:localhost/replica:0/task:0/device:CPU:0"](OneShotIterator)]]
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Reactions: 1
- Comments: 22 (18 by maintainers)
@gadagashwini the
overwrite
kwarg does not exist, it’s just a proposal: https://github.com/tensorflow/tensorflow/blob/r1.14/tensorflow/python/data/ops/dataset_ops.py#L1738-L1740overwrite
would be helpful but wouldn’t solve the concurrency problem. The op needs a do-over.+1 this is a bug in the design. cache files should support concurrent usage since one can easily get there via
tf.data.experimental.parallel_interleave()
.