tensorflow: TPUStrategy broken in TF2 Keras
System information
- Have I written custom code: YES
- OS Platform and Distribution: Google Colab
- TensorFlow installed from (source or binary): Colab
- TensorFlow version (use command below): (‘v2.0.0-rc2-26-g64c3d38’, ‘2.0.0’)
- GPU model and memory: Colab TPU
Describe the current behavior
TPU in collab cannot be used with TF 2, trying to use TFRecordDataset as an input to a Keras Model.fit() generates different exceptions depending on the whether TPUStrategy.experimental_distribute_dataset() is used or not.
- when
TPUStrategy.experimental_distribute_dataset()is not used or used but not within an/job:workercontext:
InternalError: Failed copying input tensor from /job:localhost/replica:0/task:0/device:CPU:0 to /job:worker/replica:0/task:0/device:CPU:0 in order to run AutoShardDataset: Unable to parse tensor proto
Additional GRPC error information:
{"created":"@1574107145.043685979","description":"Error received from peer","file":"external/grpc/src/core/lib/surface/call.cc","file_line":1039,"grpc_message":"Unable to parse tensor proto","grpc_status":3} [Op:AutoShardDataset]
- using a
/job:workercontext withoutexperimental_distribute_datasetcrashes the Colab session with:
Nov 18, 2019, 9:17:39 PM WARNING 2019-11-18 20:17:39.591672: E tensorflow/core/framework/variant.cc:102] Could not decode variant with type_name: "tensorflow::DatasetVariantWrapper". Perhaps you forgot to register a decoder via REGISTER_UNARY_VARIANT_DECODE_FUNCTION?
Nov 18, 2019, 9:09:53 PM WARNING 2019-11-18 20:09:53.396513: E tensorflow/core/framework/dataset.cc:76] The Encode() method is not implemented for DatasetVariantWrapper objects.
Nov 18, 2019, 9:08:19 PM WARNING 2019-11-18 20:08:19.842178: E tensorflow/core/framework/dataset.cc:76] The Encode() method is not implemented for DatasetVariantWrapper objects.
- and 4. when
experimental_distribute_datasetis used in atf.device("/job:worker")context:
/tensorflow-2.0.0/python3.6/tensorflow_core/python/keras/engine/training.py in _distribution_standardize_user_data(self, x, y, sample_weight, class_weight, batch_size, validation_split, shuffle, epochs, allow_partial_batch)
2313 x = ds.batch(batch_size, drop_remainder=drop_remainder)
2314 else:
-> 2315 assert isinstance(x, dataset_ops.DatasetV2)
2316 training_utils.validate_dataset_input(x, y, sample_weight,
2317 validation_split)
Describe the expected behavior There should be at least one way of making the example bellow work with TF2 (it works with TF1).
Code to reproduce the issue could also be checked here https://colab.research.google.com/gist/kpe/22340866c1dd3208d9177d2c8a9322e3/tpu-emb.ipynb
%tensorflow_version 2.x
import tensorflow as tf
print("TF version:", tf.__version__)
import os
tfrec_path = "gs://kpe-pub/pub/tpu-strategy-ds-issue/test.tfrecords"
def parse_example(proto):
return tf.io.parse_single_example(proto, {
"feature": tf.io.VarLenFeature(tf.float32),
"label": tf.io.VarLenFeature(tf.int64)
})
def from_tfrecords_file(tfrec_path):
ds = tf.data.TFRecordDataset([tfrec_path], compression_type="GZIP")
ds = ds.map(parse_example)
def to_dense(example):
feature = tf.cast(tf.sparse.to_dense(example["feature"]), tf.float32)
label = tf.cast(tf.sparse.to_dense(example["label"]), tf.int32)
return feature, tf.squeeze(label, -1)
ds = ds.map(to_dense)
return ds
try:
TPU_WORKER = 'grpc://' + os.environ['COLAB_TPU_ADDR']
tf.config.experimental_connect_to_host(TPU_WORKER)
tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
tf.tpu.experimental.initialize_tpu_system(tpu)
strategy = tf.distribute.experimental.TPUStrategy(tpu)
except:
strategy = tf.distribute.get_strategy()
print(strategy)
def test_strategy(use_case):
assert use_case in [0,1,2,3,4]
if tf.__version__.startswith("1."): # in TF1 no need to distribute the dataset
ds = from_tfrecords_file(tfrec_path).repeat().batch(32)
else:
if use_case == 0:
ds = from_tfrecords_file(tfrec_path).repeat().batch(32)
if use_case == 1:
ds = from_tfrecords_file(tfrec_path).repeat().batch(32)
ds = strategy.experimental_distribute_dataset(ds)
if use_case == 2:
with tf.device("/job:worker"):
ds = from_tfrecords_file(tfrec_path).repeat().batch(32)
if use_case == 3:
with tf.device("/job:worker"):
ds = from_tfrecords_file(tfrec_path).repeat().batch(32)
ds = strategy.experimental_distribute_dataset(ds)
if use_case == 4:
with strategy.scope(), tf.device("/job:worker"):
ds = from_tfrecords_file(tfrec_path).repeat().batch(32)
ds = strategy.experimental_distribute_dataset(ds)
with strategy.scope():
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(128,)),
tf.keras.layers.Dense(2)
])
model.build()
model.compile("adam", loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))
model.fit(ds, steps_per_epoch=4)
test_strategy(0) # try 0 ot 4
Other info / logs Include any logs or source code that would be helpful to diagnose the problem. If including tracebacks, please include the full traceback. Large logs and files should be attached.
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 15 (4 by maintainers)
Just noted, that disabling eager execution, makes the test cases 0 and 2 above work again, i.e. without
experimental_distribute_dataset()it works fine, justeager_executionneeds to be disabled.