tensorflow: TPUStrategy broken in TF2 Keras

System information

  • Have I written custom code: YES
  • OS Platform and Distribution: Google Colab
  • TensorFlow installed from (source or binary): Colab
  • TensorFlow version (use command below): (‘v2.0.0-rc2-26-g64c3d38’, ‘2.0.0’)
  • GPU model and memory: Colab TPU

Describe the current behavior TPU in collab cannot be used with TF 2, trying to use TFRecordDataset as an input to a Keras Model.fit() generates different exceptions depending on the whether TPUStrategy.experimental_distribute_dataset() is used or not.

  1. when TPUStrategy.experimental_distribute_dataset() is not used or used but not within an /job:worker context:
InternalError: Failed copying input tensor from /job:localhost/replica:0/task:0/device:CPU:0 to /job:worker/replica:0/task:0/device:CPU:0 in order to run AutoShardDataset: Unable to parse tensor proto
Additional GRPC error information:
{"created":"@1574107145.043685979","description":"Error received from peer","file":"external/grpc/src/core/lib/surface/call.cc","file_line":1039,"grpc_message":"Unable to parse tensor proto","grpc_status":3} [Op:AutoShardDataset]
  1. using a /job:worker context without experimental_distribute_dataset crashes the Colab session with:
Nov 18, 2019, 9:17:39 PM	WARNING	2019-11-18 20:17:39.591672: E tensorflow/core/framework/variant.cc:102] Could not decode variant with type_name: "tensorflow::DatasetVariantWrapper". Perhaps you forgot to register a decoder via REGISTER_UNARY_VARIANT_DECODE_FUNCTION?
Nov 18, 2019, 9:09:53 PM	WARNING	2019-11-18 20:09:53.396513: E tensorflow/core/framework/dataset.cc:76] The Encode() method is not implemented for DatasetVariantWrapper objects.
Nov 18, 2019, 9:08:19 PM	WARNING	2019-11-18 20:08:19.842178: E tensorflow/core/framework/dataset.cc:76] The Encode() method is not implemented for DatasetVariantWrapper objects.
  1. and 4. when experimental_distribute_dataset is used in a tf.device("/job:worker") context:
/tensorflow-2.0.0/python3.6/tensorflow_core/python/keras/engine/training.py in _distribution_standardize_user_data(self, x, y, sample_weight, class_weight, batch_size, validation_split, shuffle, epochs, allow_partial_batch)
   2313         x = ds.batch(batch_size, drop_remainder=drop_remainder)
   2314       else:
-> 2315         assert isinstance(x, dataset_ops.DatasetV2)
   2316         training_utils.validate_dataset_input(x, y, sample_weight,
   2317                                               validation_split)

Describe the expected behavior There should be at least one way of making the example bellow work with TF2 (it works with TF1).

Code to reproduce the issue could also be checked here https://colab.research.google.com/gist/kpe/22340866c1dd3208d9177d2c8a9322e3/tpu-emb.ipynb

%tensorflow_version 2.x
import tensorflow as tf
print("TF version:", tf.__version__)

import os

tfrec_path = "gs://kpe-pub/pub/tpu-strategy-ds-issue/test.tfrecords"

def parse_example(proto):
    return tf.io.parse_single_example(proto, {
        "feature": tf.io.VarLenFeature(tf.float32), 
        "label":   tf.io.VarLenFeature(tf.int64)
    })

def from_tfrecords_file(tfrec_path):
    ds = tf.data.TFRecordDataset([tfrec_path], compression_type="GZIP")
    ds = ds.map(parse_example)
    def to_dense(example):
        feature = tf.cast(tf.sparse.to_dense(example["feature"]), tf.float32)
        label   = tf.cast(tf.sparse.to_dense(example["label"]), tf.int32)
        return feature, tf.squeeze(label, -1)
    ds = ds.map(to_dense)
    return ds

try:
    TPU_WORKER = 'grpc://' + os.environ['COLAB_TPU_ADDR']
    tf.config.experimental_connect_to_host(TPU_WORKER)
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except:
    strategy = tf.distribute.get_strategy()
print(strategy)


def test_strategy(use_case):
    assert use_case in [0,1,2,3,4]

    if tf.__version__.startswith("1."): # in TF1 no need to distribute the dataset
        ds = from_tfrecords_file(tfrec_path).repeat().batch(32)
    else:
        if use_case == 0:
            ds = from_tfrecords_file(tfrec_path).repeat().batch(32)
        if use_case == 1:
            ds = from_tfrecords_file(tfrec_path).repeat().batch(32)
            ds = strategy.experimental_distribute_dataset(ds)
        if use_case == 2:            
            with tf.device("/job:worker"):
                ds = from_tfrecords_file(tfrec_path).repeat().batch(32)
        if use_case == 3:
            with tf.device("/job:worker"):
                ds = from_tfrecords_file(tfrec_path).repeat().batch(32)
                ds = strategy.experimental_distribute_dataset(ds)
        if use_case == 4:
            with strategy.scope(), tf.device("/job:worker"):
                ds = from_tfrecords_file(tfrec_path).repeat().batch(32)
                ds = strategy.experimental_distribute_dataset(ds)

    with strategy.scope():
        model = tf.keras.models.Sequential([
            tf.keras.layers.InputLayer(input_shape=(128,)),
            tf.keras.layers.Dense(2)
        ])
        model.build()
        model.compile("adam", loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))
        model.fit(ds, steps_per_epoch=4)

test_strategy(0) # try 0 ot 4

Other info / logs Include any logs or source code that would be helpful to diagnose the problem. If including tracebacks, please include the full traceback. Large logs and files should be attached.

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 15 (4 by maintainers)

Most upvoted comments

Just noted, that disabling eager execution, makes the test cases 0 and 2 above work again, i.e. without experimental_distribute_dataset() it works fine, just eager_execution needs to be disabled.