tensorflow: Running Model on tensorflow Distribution can't save model for tensorflow serving

System information

  • OS Platform CentOS 7.1:
  • TensorFlow installed from binary):
  • TensorFlow version 1.2.1:
  • Python version 2.7:
  • Bazel version 0.4.5:

Describe the problem

Situation One: I add saving model for tensorflow serving based on mnist model distribution version. when i run this model on the same machine and start one ps server and two workers. saving model can work well. Situation Two: but if the model runs on three different machines(eg: A, B, C). I start ps server on A machine, and B, C machine runs worker, there is something wrong on saving model code.

Situation Three: then I try another situation, run ps server and one worker on A, another worker on B, the worker running on A machine saves model. it can work well again.

Situation Four: and run ps server on A, the other worker on B, it can also work.

I think it is an issue of tensorflow distribution on saving model using saved_model_builder.

Source code / logs

my mnist distribution code as bellow:

#!/usr/bin/env python2.7
"""Train and export a simple Softmax Regression TensorFlow model.
The model is from the TensorFlow "MNIST For ML Beginner" tutorial. This program
simply follows all its training instructions, and uses TensorFlow SavedModel to
export the trained model with proper signatures that can be loaded by standard
tensorflow_model_server.
Usage: mnist_export.py [--training_iteration=x] [--model_version=y] export_dir
"""

import os
import sys

# This is a placeholder for a Google-internal import.

import tensorflow as tf
from tensorflow.python.ops import variables
from tensorflow.core.protobuf import saver_pb2

from tensorflow.python.saved_model import builder as saved_model_builder
from tensorflow.python.saved_model import signature_constants
from tensorflow.python.saved_model import signature_def_utils
from tensorflow.python.saved_model import tag_constants
from tensorflow.python.saved_model import utils
from tensorflow.python.util import compat
from tensorflow.examples.tutorials.mnist import input_data

from six.moves import xrange

tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs")

tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs")

tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")

tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("batch_size", 100, "Index of task within the job")

tf.app.flags.DEFINE_integer('training_iteration', 2,
                            'number of training iterations.')
tf.app.flags.DEFINE_integer('model_version', 1, 'version number of the model.')
tf.app.flags.DEFINE_string('work_dir', 'model/', 'Working directory.')
tf.app.flags.DEFINE_string('train_dir', 'MNIST_data/', 'Working directory.')


FLAGS = tf.app.flags.FLAGS


def main(_):
    ps_hosts = FLAGS.ps_hosts.split(",")

    worker_hosts = FLAGS.worker_hosts.split(",")
    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
    server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)

    if FLAGS.job_name == "ps":
        server.join()
    elif FLAGS.job_name == "worker":
        train(server, cluster)


def train(server, cluster):
    # Train model
    print('Training model...')
    with tf.device(
            tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
        mnist = input_data.read_data_sets(FLAGS.train_dir, one_hot=True)
        serialized_tf_example = tf.placeholder(tf.string, name='tf_example')
        feature_configs = {'x': tf.FixedLenFeature(shape=[784], dtype=tf.float32), }
        tf_example = tf.parse_example(serialized_tf_example, feature_configs)

        x = tf.identity(tf_example['x'], name='x')  # use tf.identity() to assign name
        y_ = tf.placeholder('float', shape=[None, 10])

        w = tf.Variable(tf.zeros([784, 10]))
        b = tf.Variable(tf.zeros([10]))

        y = tf.nn.softmax(tf.matmul(x, w) + b, name='y')
        cross_entropy = -tf.reduce_sum(y_ * tf.log(y))

        global_step = tf.Variable(0)
        train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy, global_step=global_step)
        values, indices = tf.nn.top_k(y, 10)

        prediction_classes = tf.contrib.lookup.index_to_string(
            tf.to_int64(indices), mapping=tf.constant([str(i) for i in xrange(10)]))
        correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
        accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))

        summary_op = tf.summary.merge_all()
        init_op = tf.global_variables_initializer()
        saver = tf.train.Saver()

        sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), logdir="train_logs", init_op=init_op,
                                 summary_op=summary_op, saver=saver, global_step=global_step, save_model_secs=600)

        with sv.managed_session(server.target) as sess:
            step = 0

            while not sv.should_stop() and step < 1000:
                batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
                train_feed = {x: batch_xs, y_: batch_ys}

                _, step = sess.run([train_step, global_step], feed_dict=train_feed)

                if step % 1000 == 0:
                    print("global step: {} , accuracy:{}".format(step, sess.run(accuracy,
                                                                                feed_dict=train_feed)))

            print('training accuracy %g' % sess.run(
                accuracy, feed_dict={x: mnist.test.images,
                                     y_: mnist.test.labels}))
            print('Done training!')
            if sv.is_chief:
                # Export model
                # WARNING(break-tutorial-inline-code): The following code snippet is
                # in-lined in tutorials, please update tutorial documents accordingly
                # whenever code changes.
                sess.graph._unsafe_unfinalize()
                export_path_base = FLAGS.work_dir
                export_path = os.path.join(
                    compat.as_bytes(export_path_base),
                    compat.as_bytes(str(FLAGS.model_version)))
                print('Exporting trained model to', export_path)
                builder = saved_model_builder.SavedModelBuilder(export_path)

                # Build the signature_def_map.
                classification_inputs = utils.build_tensor_info(serialized_tf_example)
                classification_outputs_classes = utils.build_tensor_info(prediction_classes)
                classification_outputs_scores = utils.build_tensor_info(values)

                classification_signature = signature_def_utils.build_signature_def(
                    inputs={signature_constants.CLASSIFY_INPUTS: classification_inputs},
                    outputs={
                        signature_constants.CLASSIFY_OUTPUT_CLASSES:
                            classification_outputs_classes,
                        signature_constants.CLASSIFY_OUTPUT_SCORES:
                            classification_outputs_scores
                    },
                    method_name=signature_constants.CLASSIFY_METHOD_NAME)

                tensor_info_x = utils.build_tensor_info(x)
                tensor_info_y = utils.build_tensor_info(y)

                prediction_signature = signature_def_utils.build_signature_def(
                    inputs={'images': tensor_info_x},
                    outputs={'scores': tensor_info_y},
                    method_name=signature_constants.PREDICT_METHOD_NAME)

                legacy_init_op = tf.group(tf.tables_initializer(), name='legacy_init_op')

                builder.add_meta_graph_and_variables(
                    sess, [tag_constants.SERVING],
                    signature_def_map={
                        'predict_images':
                            prediction_signature,
                        signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
                            classification_signature,
                    },
                    legacy_init_op=legacy_init_op)

                builder.save()

                print('Done exporting!')

if __name__ == '__main__':
    tf.app.run()

Error Log

Extracting MNIST_data/train-images-idx3-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
WARNING:tensorflow:From distribute_mnist_serving_model.py:85: index_to_string (from tensorflow.contrib.lookup.lookup_ops) is deprecated and will be removed after 2017-01-07.
Instructions for updating:
This op will be removed after the deprecation date. Please switch to index_to_string_table_from_tensor and call the lookup method of the returned table.
('Exporting trained model to', 'model/1')
2017-07-17 17:30:10.703382: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session e5aa2c66bff69f11 with config:
global step: 0 , accuracy:0.469999998808
training accuracy 0.5701
Done training!
Traceback (most recent call last):
  File "distribute_mnist_serving_model.py", line 176, in <module>
    tf.app.run()
  File "/usr/lib/python2.7/site-packages/tensorflow/python/platform/app.py", line 48, in run
    _sys.exit(main(_sys.argv[:1] + flags_passthrough))
  File "distribute_mnist_serving_model.py", line 58, in main
    train(server, cluster)
  File "distribute_mnist_serving_model.py", line 159, in train
    legacy_init_op=legacy_init_op)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/saved_model/builder_impl.py", line 362, in add_meta_graph_and_variables
    saver.save(sess, variables_path, write_meta_graph=False, write_state=False)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 1488, in save
    raise exc
tensorflow.python.framework.errors_impl.NotFoundError: model/1/variables/variables_temp_962a99f708244380a378c7e2218c6865
         [[Node: save_1/SaveV2 = SaveV2[dtypes=[DT_FLOAT, DT_FLOAT, DT_INT32], _device="/job:ps/replica:0/task:0/cpu:0"](save_1/ShardedFilename, save_1/SaveV2/tensor_names, save_1/SaveV2/shape_and_slices, Variable, Variable_1, Variable_2)]]
Caused by op u'save_1/SaveV2', defined at:
  File "distribute_mnist_serving_model.py", line 176, in <module>
    tf.app.run()
  File "/usr/lib/python2.7/site-packages/tensorflow/python/platform/app.py", line 48, in run
    _sys.exit(main(_sys.argv[:1] + flags_passthrough))
  File "distribute_mnist_serving_model.py", line 58, in main
    train(server, cluster)
  File "distribute_mnist_serving_model.py", line 159, in train
    legacy_init_op=legacy_init_op)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/saved_model/builder_impl.py", line 356, in add_meta_graph_and_variables
    allow_empty=True)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 1139, in __init__
    self.build()
  File "/usr/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 1170, in build
    restore_sequentially=self._restore_sequentially)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 685, in build
    save_tensor = self._AddShardedSaveOps(filename_tensor, per_device)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 361, in _AddShardedSaveOps
    return self._AddShardedSaveOpsForV2(filename_tensor, per_device)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 335, in _AddShardedSaveOpsForV2
    sharded_saves.append(self._AddSaveOps(sharded_filename, saveables))
  File "/usr/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 276, in _AddSaveOps
    save = self.save_op(filename_tensor, saveables)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 219, in save_op
    tensors)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/ops/gen_io_ops.py", line 745, in save_v2
    tensors=tensors, name=name)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/framework/op_def_library.py", line 767, in apply_op
    op_def=op_def)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 2506, in create_op
    original_op=self._default_original_op, op_def=op_def)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 1269, in __init__
    self._traceback = _extract_stack()
NotFoundError (see above for traceback): model/1/variables/variables_temp_962a99f708244380a378c7e2218c6865
         [[Node: save_1/SaveV2 = SaveV2[dtypes=[DT_FLOAT, DT_FLOAT, DT_INT32], _device="/job:ps/replica:0/task:0/cpu:0"](save_1/ShardedFilename, save_1/SaveV2/tensor_names, save_1/SaveV2/shape_and_slices, Variable, Variable_1, Variable_2)]]

The Correct Log

Extracting MNIST_data/train-images-idx3-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
WARNING:tensorflow:From distribute_mnist_serving_model.py:83: index_to_string (from tensorflow.contrib.lookup.lookup_ops) is deprecated and will be removed after 2017-01-07.
Instructions for updating:
This op will be removed after the deprecation date. Please switch to index_to_string_table_from_tensor and call the lookup method of the returned table.
('Exporting trained model to', 'model/1')
2017-07-17 17:38:46.714706: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session 2618b797d2ef99b4 with config:
global step: 0 , accuracy:0.479999989271
training accuracy 0.5513
Done training!
Done exporting!

Exact command to reproduce

Situation One: work well

python distribute_mnist_serving_model.py --ps_hosts=A:2222 --worker_hosts=A:2223,A:2224 --job_name=ps --task_index=0

python distribute_mnist_serving_model.py --ps_hosts=A:2222 --worker_hosts=A:2223,A:2224 --job_name=worker --task_index=0

python distribute_mnist_serving_model.py --ps_hosts=A:2222 --worker_hosts=A:2223,A:2224 --job_name=worker --task_index=1

Situation Two: can’t work

python distribute_mnist_serving_model.py --ps_hosts=A:2222 --worker_hosts=B:2222,C:2222 --job_name=ps --task_index=0

python distribute_mnist_serving_model.py --ps_hosts=A:2222 --worker_hosts=B:2222,C:2222 --job_name=worker --task_index=0

python distribute_mnist_serving_model.py --ps_hosts=A:2222 --worker_hosts=B:2222,C:2222 --job_name=worker --task_index=1

Situation Three: work well

python distribute_mnist_serving_model.py --ps_hosts=A:2222 --worker_hosts=A:2223,B:2222 --job_name=ps --task_index=0

python distribute_mnist_serving_model.py --ps_hosts=A:2222 --worker_hosts=A:2223,B:2222 --job_name=worker --task_index=0

python distribute_mnist_serving_model.py --ps_hosts=A:2222 --worker_hosts=A:2223,B:2222 --job_name=worker --task_index=1

Situation Four: work well

python distribute_mnist_serving_model.py --ps_hosts=A:2222 --worker_hosts=B:2223,B:2222 --job_name=ps --task_index=0

python distribute_mnist_serving_model.py --ps_hosts=A:2222 --worker_hosts=B:2223,B:2222 --job_name=worker --task_index=0

python distribute_mnist_serving_model.py --ps_hosts=A:2222 --worker_hosts=B:2223,B:2222 --job_name=worker --task_index=1

I Don’t know what cause this problem. how can i fix this issue? thanks a lot.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 28 (10 by maintainers)

Most upvoted comments

Hello all, just follow the below video and export your own model with in a 10 seconds

https://youtu.be/w0Ebsbz7HYA

Yes. I think this is due to the issue of the parameter server and chief worker being on different nodes. Apparently they need to save their checkpoints and model files to the same common directory (e.g. NFS or cloud bucket). Alternatively, you can run the distributed setup without a parameter server (in which case the chief node acts like the ps). However, I’ve found that this is much slower and basically negates the speed up you get by going distributed.

It’d be great if TensorFlow added a function to allow us to consolidate the graph at the end onto the chief node in order to save the trained model.