tensorflow: Cache lockfile already exists

Using the train_and_evaluate method from estimator API and cache policy on filesystem, I get an error because the evaluation starts before that all cache is written on the filesystem. So when the train runs the second time, after the evaluation, I get the error because it finds the lock file. If the cache is written before the first evaluation I don’t get the error.

Here a snippet runnable on colab

import tensorflow as tf
import numpy as np
from tensorflow import keras as ks
import itertools


def batch_reshape(a, b):
    a.set_shape([None, None, 3])
    b.set_shape([None, None, 1])
    return (a, b)


def gen(n):
    a = np.array(np.random.rand(5, 5, 3).astype(np.float32))
    b = np.array(np.random.rand(5, 5, 1).astype(np.float32))
    return (a, b)


def my_input_fn_train():
    ds = tf.data.Dataset.range(100000)
    ds = ds.map(
        lambda x: tf.py_func(func=gen, inp=[x], Tout=[tf.float32, tf.float32]))
    ds = ds.map(batch_reshape)
    ds = ds.cache("/tmp/mycache_train")

    ds = ds.repeat(3)

    value = ds.make_one_shot_iterator().get_next()

    return {"input_rgb": value[0]}, {"softmax": value[1]}


def my_input_fn_eval():
    ds = tf.data.Dataset.range(100000)
    ds = ds.map(
        lambda x: tf.py_func(func=gen, inp=[x], Tout=[tf.float32, tf.float32]))
    ds = ds.map(batch_reshape)
    ds = ds.cache("/tmp/mycache_eval")

    ds = ds.repeat(3)

    value = ds.make_one_shot_iterator().get_next()

    return {"input_rgb": value[0]}, {"softmax": value[1]}


def main():
    tf.logging.set_verbosity(tf.logging.INFO)
    input_rgb = ks.layers.Input(shape=(1, 5, 5, 3), name="input_rgb")
    x = ks.layers.Dense(1, activation='relu', name="Dense_1")(input_rgb)
    x = ks.layers.Dense(1, activation='softmax', name="softmax")(x)
    model = ks.models.Model(inputs=[input_rgb], outputs=[x])
    model.compile(
        loss={'softmax': 'binary_crossentropy'},
        optimizer=tf.keras.optimizers.Adam())

    estimator = ks.estimator.model_to_estimator(keras_model=model)

    train_spec = tf.estimator.TrainSpec(input_fn=my_input_fn_train)

    eval_spec = tf.estimator.EvalSpec(
        input_fn=my_input_fn_eval, steps=5, throttle_secs=2)

    tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)


if __name__ == "__main__":
    main()

and here the output

INFO:tensorflow:Using the Keras model from memory.
INFO:tensorflow:Using default config.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpd1r865ry
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpd1r865ry', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fcb48f9ecf8>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after 2 secs (eval_spec.throttle_secs) or training is finished.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpd1r865ry/keras_model.ckpt
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 1 into /tmp/tmpd1r865ry/model.ckpt.
INFO:tensorflow:loss = 9.25762, step = 1
INFO:tensorflow:global_step/sec: 612.893
INFO:tensorflow:loss = 9.348354, step = 101 (0.166 sec)
INFO:tensorflow:global_step/sec: 952.69
INFO:tensorflow:loss = 8.319871, step = 201 (0.108 sec)
INFO:tensorflow:global_step/sec: 838.883
INFO:tensorflow:loss = 7.7207212, step = 301 (0.119 sec)
INFO:tensorflow:global_step/sec: 917.55
INFO:tensorflow:loss = 8.657611, step = 401 (0.110 sec)
INFO:tensorflow:global_step/sec: 866.062
INFO:tensorflow:loss = 8.567427, step = 501 (0.118 sec)
INFO:tensorflow:global_step/sec: 862.789
INFO:tensorflow:loss = 7.4227247, step = 601 (0.110 sec)
INFO:tensorflow:global_step/sec: 847.577
INFO:tensorflow:loss = 6.676866, step = 701 (0.118 sec)
INFO:tensorflow:global_step/sec: 911.827
INFO:tensorflow:loss = 7.4915752, step = 801 (0.112 sec)
INFO:tensorflow:global_step/sec: 837.15
INFO:tensorflow:loss = 7.7332606, step = 901 (0.120 sec)
INFO:tensorflow:global_step/sec: 857.915
INFO:tensorflow:loss = 7.283838, step = 1001 (0.116 sec)
INFO:tensorflow:global_step/sec: 786.144
INFO:tensorflow:loss = 8.61713, step = 1101 (0.127 sec)
INFO:tensorflow:Saving checkpoints for 1150 into /tmp/tmpd1r865ry/model.ckpt.
INFO:tensorflow:Loss for final step: 8.417924.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2018-04-05-15:36:08
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpd1r865ry/model.ckpt-1150
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/5]
INFO:tensorflow:Evaluation [2/5]
INFO:tensorflow:Evaluation [3/5]
INFO:tensorflow:Evaluation [4/5]
INFO:tensorflow:Evaluation [5/5]
INFO:tensorflow:Finished evaluation at 2018-04-05-15:36:08

INFO:tensorflow:Saving dict for global step 1150: global_step = 1150, loss = 8.261229
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpd1r865ry/model.ckpt-1150
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.

---------------------------------------------------------------------------
AlreadyExistsError                        Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _do_call(self, fn, *args)
   1360     try:
-> 1361       return fn(*args)
   1362     except errors.OpError as e:

/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _run_fn(session, feed_dict, fetch_list, target_list, options, run_metadata)
   1339           return tf_session.TF_Run(session, options, feed_dict, fetch_list,
-> 1340                                    target_list, status, run_metadata)
   1341 

/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/errors_impl.py in __exit__(self, type_arg, value_arg, traceback_arg)
    515             compat.as_text(c_api.TF_Message(self.status.status)),
--> 516             c_api.TF_GetCode(self.status.status))
    517     # Delete the underlying status object from memory otherwise it stays alive

AlreadyExistsError: There appears to be a concurrent caching iterator running - cache lockfile already exists ('/tmp/mycache_train.lockfile'). If you are sure no other running TF computations are using this cache prefix, delete the lockfile and re-initialize the iterator. Lockfile contents: Created at: 1522942566
	 [[Node: IteratorGetNext = IteratorGetNext[output_shapes=[[?,?,3], [?,?,1]], output_types=[DT_FLOAT, DT_FLOAT], _device="/job:localhost/replica:0/task:0/device:CPU:0"](OneShotIterator)]]

During handling of the above exception, another exception occurred:

AlreadyExistsError                        Traceback (most recent call last)
<ipython-input-5-9d8332f50ddd> in <module>()
     66 
     67 if __name__ == "__main__":
---> 68     main()

<ipython-input-5-9d8332f50ddd> in main()
     62         input_fn=my_input_fn_eval, steps=5, throttle_secs=2)
     63 
---> 64     tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
     65 
     66 

/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/training.py in train_and_evaluate(estimator, train_spec, eval_spec)
    419         '(with task id 0).  Given task id {}'.format(config.task_id))
    420 
--> 421   executor.run()
    422 
    423 

/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/training.py in run(self)
    492         config.task_type != run_config_lib.TaskType.EVALUATOR):
    493       logging.info('Running training and evaluation locally (non-distributed).')
--> 494       self.run_local()
    495       return
    496 

/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/training.py in run_local(self)
    624           input_fn=self._train_spec.input_fn,
    625           max_steps=self._train_spec.max_steps,
--> 626           hooks=train_hooks)
    627 
    628       # Final export signal: For any eval result with global_step >= train

/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/estimator.py in train(self, input_fn, hooks, steps, max_steps, saving_listeners)
    350 
    351     saving_listeners = _check_listeners_type(saving_listeners)
--> 352     loss = self._train_model(input_fn, hooks, saving_listeners)
    353     logging.info('Loss for final step: %s.', loss)
    354     return self

/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/estimator.py in _train_model(self, input_fn, hooks, saving_listeners)
    889         loss = None
    890         while not mon_sess.should_stop():
--> 891           _, loss = mon_sess.run([estimator_spec.train_op, estimator_spec.loss])
    892       return loss
    893 

/usr/local/lib/python3.6/dist-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)
    544                           feed_dict=feed_dict,
    545                           options=options,
--> 546                           run_metadata=run_metadata)
    547 
    548   def run_step_fn(self, step_fn):

/usr/local/lib/python3.6/dist-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)
   1020                               feed_dict=feed_dict,
   1021                               options=options,
-> 1022                               run_metadata=run_metadata)
   1023       except _PREEMPTION_ERRORS as e:
   1024         logging.info('An error was raised. This may be due to a preemption in '

/usr/local/lib/python3.6/dist-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs)
   1111         raise six.reraise(*original_exc_info)
   1112       else:
-> 1113         raise six.reraise(*original_exc_info)
   1114 
   1115 

/usr/local/lib/python3.6/dist-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/usr/local/lib/python3.6/dist-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs)
   1096   def run(self, *args, **kwargs):
   1097     try:
-> 1098       return self._sess.run(*args, **kwargs)
   1099     except _PREEMPTION_ERRORS:
   1100       raise

/usr/local/lib/python3.6/dist-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)
   1168                                   feed_dict=feed_dict,
   1169                                   options=options,
-> 1170                                   run_metadata=run_metadata)
   1171 
   1172     for hook in self._hooks:

/usr/local/lib/python3.6/dist-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs)
    948 
    949   def run(self, *args, **kwargs):
--> 950     return self._sess.run(*args, **kwargs)
    951 
    952   def run_step_fn(self, step_fn, raw_session, run_with_hooks):

/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in run(self, fetches, feed_dict, options, run_metadata)
    903     try:
    904       result = self._run(None, fetches, feed_dict, options_ptr,
--> 905                          run_metadata_ptr)
    906       if run_metadata:
    907         proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)

/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _run(self, handle, fetches, feed_dict, options, run_metadata)
   1135     if final_fetches or final_targets or (handle and feed_dict_tensor):
   1136       results = self._do_run(handle, final_targets, final_fetches,
-> 1137                              feed_dict_tensor, options, run_metadata)
   1138     else:
   1139       results = []

/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _do_run(self, handle, target_list, fetch_list, feed_dict, options, run_metadata)
   1353     if handle is None:
   1354       return self._do_call(_run_fn, self._session, feeds, fetches, targets,
-> 1355                            options, run_metadata)
   1356     else:
   1357       return self._do_call(_prun_fn, self._session, handle, feeds, fetches)

/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _do_call(self, fn, *args)
   1372         except KeyError:
   1373           pass
-> 1374       raise type(e)(node_def, op, message)
   1375 
   1376   def _extend_graph(self):

AlreadyExistsError: There appears to be a concurrent caching iterator running - cache lockfile already exists ('/tmp/mycache_train.lockfile'). If you are sure no other running TF computations are using this cache prefix, delete the lockfile and re-initialize the iterator. Lockfile contents: Created at: 1522942566
	 [[Node: IteratorGetNext = IteratorGetNext[output_shapes=[[?,?,3], [?,?,1]], output_types=[DT_FLOAT, DT_FLOAT], _device="/job:localhost/replica:0/task:0/device:CPU:0"](OneShotIterator)]]

Caused by op 'IteratorGetNext', defined at:
  File "/usr/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "/usr/local/lib/python3.6/dist-packages/traitlets/config/application.py", line 658, in launch_instance
    app.start()
  File "/usr/local/lib/python3.6/dist-packages/ipykernel/kernelapp.py", line 477, in start
    ioloop.IOLoop.instance().start()
  File "/usr/local/lib/python3.6/dist-packages/zmq/eventloop/ioloop.py", line 177, in start
    super(ZMQIOLoop, self).start()
  File "/usr/local/lib/python3.6/dist-packages/tornado/ioloop.py", line 888, in start
    handler_func(fd_obj, events)
  File "/usr/local/lib/python3.6/dist-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/zmq/eventloop/zmqstream.py", line 440, in _handle_events
    self._handle_recv()
  File "/usr/local/lib/python3.6/dist-packages/zmq/eventloop/zmqstream.py", line 472, in _handle_recv
    self._run_callback(callback, msg)
  File "/usr/local/lib/python3.6/dist-packages/zmq/eventloop/zmqstream.py", line 414, in _run_callback
    callback(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/ipykernel/kernelbase.py", line 283, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "/usr/local/lib/python3.6/dist-packages/ipykernel/kernelbase.py", line 235, in dispatch_shell
    handler(stream, idents, msg)
  File "/usr/local/lib/python3.6/dist-packages/ipykernel/kernelbase.py", line 399, in execute_request
    user_expressions, allow_stdin)
  File "/usr/local/lib/python3.6/dist-packages/ipykernel/ipkernel.py", line 196, in do_execute
    res = shell.run_cell(code, store_history=store_history, silent=silent)
  File "/usr/local/lib/python3.6/dist-packages/ipykernel/zmqshell.py", line 533, in run_cell
    return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2718, in run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2822, in run_ast_nodes
    if self.run_code(code, result):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2882, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-5-9d8332f50ddd>", line 68, in <module>
    main()
  File "<ipython-input-5-9d8332f50ddd>", line 64, in main
    tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/training.py", line 421, in train_and_evaluate
    executor.run()
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/training.py", line 494, in run
    self.run_local()
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/training.py", line 626, in run_local
    hooks=train_hooks)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/estimator.py", line 352, in train
    loss = self._train_model(input_fn, hooks, saving_listeners)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/estimator.py", line 809, in _train_model
    input_fn, model_fn_lib.ModeKeys.TRAIN))
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/estimator.py", line 668, in _get_features_and_labels_from_input_fn
    result = self._call_input_fn(input_fn, mode)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/estimator/estimator.py", line 760, in _call_input_fn
    return input_fn(**kwargs)
  File "<ipython-input-5-9d8332f50ddd>", line 28, in my_input_fn_train
    value = ds.make_one_shot_iterator().get_next()
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/data/ops/iterator_ops.py", line 330, in get_next
    name=name)), self._output_types,
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/ops/gen_dataset_ops.py", line 866, in iterator_get_next
    output_shapes=output_shapes, name=name)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/op_def_library.py", line 787, in _apply_op_helper
    op_def=op_def)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/ops.py", line 3271, in create_op
    op_def=op_def)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/ops.py", line 1650, in __init__
    self._traceback = self._graph._extract_stack()  # pylint: disable=protected-access

AlreadyExistsError (see above for traceback): There appears to be a concurrent caching iterator running - cache lockfile already exists ('/tmp/mycache_train.lockfile'). If you are sure no other running TF computations are using this cache prefix, delete the lockfile and re-initialize the iterator. Lockfile contents: Created at: 1522942566
	 [[Node: IteratorGetNext = IteratorGetNext[output_shapes=[[?,?,3], [?,?,1]], output_types=[DT_FLOAT, DT_FLOAT], _device="/job:localhost/replica:0/task:0/device:CPU:0"](OneShotIterator)]]

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Reactions: 1
  • Comments: 22 (18 by maintainers)

Most upvoted comments

@gadagashwini the overwrite kwarg does not exist, it’s just a proposal: https://github.com/tensorflow/tensorflow/blob/r1.14/tensorflow/python/data/ops/dataset_ops.py#L1738-L1740

overwrite would be helpful but wouldn’t solve the concurrency problem. The op needs a do-over.

+1 this is a bug in the design. cache files should support concurrent usage since one can easily get there via tf.data.experimental.parallel_interleave().