tensorflow: FailedPreconditionError when restoring initializable_iterator with Scaffold in a MonitoredTrainingSession for the second time.

System information

  • Have I written custom code (as opposed to using a stock example script provided in TensorFlow): +
  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): macOS
  • TensorFlow installed from (source or binary): binary
  • TensorFlow version (use command below): v1.3.0-rc2-20-g0787eee 1.3.0
  • Python version: Python 3.5.1
  • Bazel version (if compiling from source): -
  • CUDA/cuDNN version: -
  • GPU model and memory: -
  • Exact command to reproduce: -

Context:

Using initializable_iterator with MonitoredTrainingSession because there are stateful lookup_ops.index_table_from_tensor() lookup tables that don’t work with one_shot_iterator.

initializable_iterator is initialized with a tf.train.Scaffold():

Scaffold = tf.train.Scaffold(
        init_op=control_flow_ops.group(variables.global_variables_initializer(),
                                       resources.initialize_resources(resources.shared_resources()),
                                       iter_init_op))

with tf.train.MonitoredTrainingSession(
    master=server.target,
    is_chief=hps.is_chief,
    scaffold=Scaffold,
    config=config,
    checkpoint_dir=hps.checkpoint_dir,
    hooks=hooks
) as mon_sess:
                ...

Where iter_init_op is equivalent to iterator.initializer.

Problem

Upper mentioned initialization works properly when the model is initialized and created for the first time and some initial training can be done without problems.

If chief worker crashes or is shut down purposefully, after restarting MonitoredTrainingSession shows following error as if iterator is not initialized:

FailedPreconditionError (see above for traceback): GetNext() failed because the iterator has not been initialized. Ensure that you have run the initializer operation for this iterator before getting the next element...

Workaround

Right now the only solution that works for is to run initialization internally using _coordinated_creator.tf_sess.run:

mon_sess._coordinated_creator.tf_sess.run(iter_init_op)

This doesn’t look like an intended use.

Statement:

This doesn’t seem as an intended behaviour. What is a better way to use initializable_iterator with MonitoredTrainingSession or lookup_ops.index_table_from_tensor with one_shot_iterator?

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 32 (13 by maintainers)

Commits related to this issue

Most upvoted comments

Related to @dongjk’s question/problem, I have an explicit example of loading tfrecords with the Dataset API on a modified version of the tensorflow/models/research/resnet example, with @ispirmustafa’s suggestion of using a hook (thank you for that suggestion btw, it solved a very long headache 😃). Hopefully it helps “connect the dots” in case anyone like me is having trouble piecing all of this together and making a working example with their own data. I really like the Dataset version of doing things, and am looking forward to see how it develops further. Thanks!

import tensorflow as tf

...

train_tfrecord_path = '/path/to/my/train-000.tfrecord'
train_tfrecord_filenames = [train_tfrecord_path] # Add more to the list if you need
val_tfrecord_path = '/path/to/my/val-000.tfrecord'
val_tfrecord_filenames = [val_tfrecord_path] # Add more to the list if you need

#Make a placeholder, we will use a feed_dict to fill this with the filenames of the tfrecords
filenames_placeholder = tf.placeholder(tf.string, shape=[None])

# Create the `Dataset` and apply some preprocessing
dataset = tf.data.TFRecordDataset(filenames_placeholder)
dataset = dataset.map(_my_parse_function, num_parallel_calls=4)
dataset = dataset.repeat()
# This method of batching assures a fixed batch size and avoids problems
# of unknown shapes [?, num_classes] (for labels), otherwise can use `dataset.batch(batch_size)`
dataset = dataset.apply(tf.contrib.data.batch_and_drop_remainder(FLAGS.batch_size))
dataset = dataset.prefetch(1)

# Create the `Iterator`, `Initializer` and get the images and labels for building the model
iterator = tf.data.Iterator.from_structure(dataset.output_types, dataset.output_shapes)
init_train = iterator.make_initializer(dataset)
images, labels = iterator.get_next()

...

# Create the hook to initialize the Iterator with the filenames_list, credit to @ispirmustafa
class _DatasetInitializerHook(tf.train.SessionRunHook):
    def __init__(self, initializer, filenames_list):
        self._initializer = initializer
        self._filenames_list = filenames_list
    def begin(self):
        pass
    def after_create_session(self, session, coord):
        del coord
        session.run(self._initializer, feed_dict={filenames_placeholder: self._filenames_list})

...

# define the training function. Modified version of wideresnet cifar example from <https://github.com/tensorflow/models/tree/master/research/resnet>
def train(hps):
    # `images` and `labels` are the previous output from our call to `iterator.get_next()`
    model = resnet_model.ResNet(hps, images, labels, FLAGS.mode, batch_size=FLAGS.batch_size)
    model.build_graph()

    # Make instance of `_DatasetInitializerHook`
    initializer_hook = _DatasetInitializerHook(init_train, train_tfrecord_filenames)
    ...
    # After everything else is prepared, prepend the `initializer_hook` to the hooks in `MonitoredTrainingSession`
    with tf.train.MonitoredTrainingSession(
            checkpoint_dir=FLAGS.log_root,
            hooks=[initializer_hook, logging_hook, _LearningRateSetterHook()],
            chief_only_hooks=[summary_hook],
            save_summaries_steps=0,
            config=tf.ConfigProto(allow_soft_placement=True)) as mon_sess:
        while not mon_sess.should_stop():
            mon_sess.run(model.train_op)

....

def eval(hps):
    # Modify `eval` the same way, but use an initializer for the validation dataset, or feed `val_filenames`

    ...

...

# For completeness, the rest of the framework from the tf example
# Note: I removed the gpu and batch_size handling from the example, modify the network accordingly.
def main(_):
    hps = resnet_model.HParams(batch_size=FLAGS.batch_size,
                               num_classes=FLAGS.num_classes,
                               min_lrn_rate=0.0001,
                               lrn_rate=0.1,
                               num_residual_units=5,
                               use_bottleneck=False,
                               weight_decay_rate=0.0002,
                               relu_leakiness=0.1,
                               optimizer='mom')

    if FLAGS.mode == 'train':
        train(hps)
    elif FLAGS.mode == 'eval':
        evaluate(hps)

if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    tf.app.run()

@dongjk you can use a hook to do all you need. Following is a code example.

initializer_hook = DatasetInitializerHook(dataset.make_initializable_iterator())
with MonitoredTrainingSession(hooks=[], ...

class _DatasetInitializerHook(tf.train.SessionRunHook):
  def __init__(self, iterator):
    self._iterator = iterator
  def begin(self):
    self._initializer = self._iterator.initializer
  def after_create_session(self, session, coord):
    del coord
    session.run(self._initializer, your-feed-dict)

This happens because the init_op is not run when the worker restarts from a checkpoint. The relevant implementation is in SessionManager.prepare_session().

I think for the purposes of the MonitoredSession, an initializable iterator is more like a “local variable” (which are reinitialized on each worker when they start) than a “global variable” (which are initialized once by the chief, and then restored from checkpoints). Could you try moving the initializer to the Scaffold.local_init_op and see if that fixes things?

(This is clearly “not great”. We’re still figuring out a more elegant way to integrate Datasets with the MonitoredSession and Estimator APIs. Hopefully this suggestion works in the meantime.)

/cc @ispirmustafa for MonitoredSession wisdom.

Scaffold.local_init_op works as intended, thanks for the suggestion. Here is a working example for future reference:

Scaffold = tf.train.Scaffold(
    local_init_op=control_flow_ops.group(variables.local_variables_initializer(),
                                         lookup_ops.tables_initializer(),
                                         iter_init_op)
    )

Datasets is a pleasant API, thanks for the effort.

@mrry Thanks, It works without any hooks 👍. So the solution is to invoke the iterator.string_handle() before creating the MonitoredSession.

# Iterator Handle and MonitoredTrainingSession

dataset_train = Dataset.range(10)
dataset_val = Dataset.range(90, 100)

iter_train_handle = dataset_train.make_one_shot_iterator().string_handle()
iter_val_handle = dataset_val.make_one_shot_iterator().string_handle()

handle = tf.placeholder(tf.string, shape=[])
iterator = Iterator.from_string_handle(
    handle, dataset_train.output_types, dataset_train.output_shapes)
next_batch = iterator.get_next()

with tf.train.MonitoredTrainingSession() as sess:
    handle_train, handle_val = sess.run([iter_train_handle, iter_val_handle])
    
    for step in range(10):
        print('train', sess.run(next_batch, feed_dict={handle: handle_train}))
        
        if step % 3 == 0:
            print('val', sess.run(next_batch, feed_dict={handle: handle_val}))

Output:
('train', 0)
('val', 90)
('train', 1)
('train', 2)
('val', 91)
('train', 3)

Thanks for confirming that that works! We’re definitely still looking for a way to make Datasets work more naturally with MonitoredSession, though 😃.

Right, the handles could potentially be collected in the after_session_create() method. Annoyingly, you’ll need to call iterator.string_handle() on each of the iterators before creating the MonitoredSession, and pass the resulting string-valued tf.Tensor objects to the hook.