tfx: TFX Evaluator: Cast string to float is not supported

Tensorflow Version: 2.3.1 TFX Version: 0.25.0 Python Version: 3.7.3

I encounter a problem which the value of label of raw example didn’t be transformed to indicator format in evaluator component.

pipeline:

    components = []

    example_gen = CsvExampleGen(input_base=data_path)
    components.append(example_gen)

    statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
    components.append(statistics_gen)

    schema_gen = SchemaGen(
        statistics=statistics_gen.outputs['statistics'],
        infer_feature_shape=True)
    components.append(schema_gen)

    example_validator = ExampleValidator(
        statistics=statistics_gen.outputs['statistics'],
        schema=schema_gen.outputs['schema'])
    components.append(example_validator)

    transform = Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        preprocessing_fn=PREPROCESSING_FN
    )
    components.append(transform)

    # Uses user-provided Python function that implements a model using TF-Learn.
    trainer_args = {
        'run_fn': RUN_FN,
        'transformed_examples': transform.outputs['transformed_examples'],
        'schema': schema_gen.outputs['schema'],
        'transform_graph': transform.outputs['transform_graph'],
        'train_args': trainer_pb2.TrainArgs(num_steps=configs.TRAIN_NUM_STEPS),
        'eval_args': trainer_pb2.EvalArgs(num_steps=configs.EVAL_NUM_STEPS),
        'custom_executor_spec':
            executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
    }

    trainer = Trainer(**trainer_args)
    components.append(trainer)

    # Get the latest blessed model for model validation.
    model_resolver = ResolverNode(
        instance_name='latest_blessed_model_resolver',
        resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
        model=Channel(type=Model),
        model_blessing=Channel(type=ModelBlessing))
    components.append(model_resolver)

    # Uses TFMA to compute a evaluation statistics over features of a model and
    # perform quality validation of a candidate model (compared to a baseline).
    eval_config = tfma.EvalConfig(
        model_specs=[tfma.ModelSpec(label_key="tags")],
        slicing_specs=[tfma.SlicingSpec()],
        metrics_specs=tfma.metrics.specs_from_metrics([
            tfma.metrics.ExampleCount(name='example_count'),
            tf.keras.metrics.CategoricalAccuracy(name='accuracy'),
        ]))
    evaluator = Evaluator(
        examples=example_gen.outputs['examples'],
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        # Change threshold will be ignored if there is no baseline (first run).
        eval_config=eval_config)
    components.append(evaluator)

    # Checks whether the model passed the validation steps and pushes the model
    # to a file destination if check passed.
    pusher_args = {
        'model':
            trainer.outputs['model'],
        'model_blessing':
            evaluator.outputs['blessing'],
        'push_destination':
            pusher_pb2.PushDestination(
                filesystem=pusher_pb2.PushDestination.Filesystem(
                    base_directory=serving_model_dir)),
    }

    pusher = Pusher(**pusher_args)  # pylint: disable=unused-variable
    components.append(pusher)

transformation:

def _fill_in_missing(x):
    """Replace missing values in a SparseTensor.

    Fills in missing values of `x` with '' or 0, and converts to a dense tensor.

    Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
    in the second dimension.

    Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
    """
    if isinstance(x, tf.sparse.SparseTensor):
        default_value = '' if x.dtype == tf.string else 0
        dense_tensor = tf.sparse.to_dense(
            tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
            default_value)
    else:
        dense_tensor = x

    return tf.squeeze(dense_tensor, axis=1)

def _transform_labels(inputs):
    labels = inputs[features.LABEL_KEY]
    print("labels", labels)
    labels_fill = _fill_in_missing(labels)
    print("labels_fill", labels_fill)
    labels_split = tf.compat.v1.strings.split(labels_fill, sep=",")
    print("labels_split", labels_split)
    labels_tokens = tft.compute_and_apply_vocabulary(labels_split, vocab_filename=features.LABEL_KEY)
    print("labels_tokens", labels_tokens)
    labels_indicators = tf.reduce_max(tf.one_hot(tf.sparse.to_dense(labels_tokens), depth=12), axis=1)
    print("labels_indicators", labels_indicators)
    return labels_indicators


def preprocessing_fn(inputs):

    contents = _transform_contents(inputs)
    labels = _transform_labels(inputs)

    outputs = {
        "content_xf": contents,
        "tags_xf": labels
    }

    return outputs

trainer:

def _input_fn(file_pattern, data_accessor, tf_transform_output, batch_size=200):
    """Generates features and label for tuning/training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
    return data_accessor.tf_dataset_factory(
        file_pattern,
        dataset_options.TensorFlowDatasetOptions(
            batch_size=batch_size,
            label_key="tags_xf"),
        tf_transform_output.transformed_metadata.schema)


def _build_model():
    n_labels = 12
    dropout_rate = 0.3
    learning_rate = 1e-3

    columns = tf.feature_column.numeric_column("content_xf", shape=features.TOP_K)

    input_layer = {"content_xf": keras.layers.Input(name="content", shape=features.TOP_K)}

    inputs = keras.layers.DenseFeatures(columns)(input_layer)
    x = keras.layers.Dense(128, kernel_initializer='he_uniform', activation='relu')(inputs)
    x = keras.layers.Dropout(dropout_rate)(x)
    x = keras.layers.Dense(128, kernel_initializer='he_uniform', activation='relu')(x)
    x = keras.layers.Dropout(dropout_rate)(x)
    outputs = keras.layers.Dense(n_labels, activation='sigmoid')(x)

    model = keras.Model(input_layer, outputs=outputs)
    model.compile(
        loss='binary_crossentropy',
        optimizer=keras.optimizers.Adam(lr=learning_rate))
    model.summary(print_fn=logging.info)

    return model


def _get_serve_tf_examples_fn(model, tf_transform_output):
    """Returns a function that parses a serialized tf.Example and applies TFT."""

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        """Returns the output to be used in the serving signature."""
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(features.LABEL_KEY)
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        return model(transformed_features)

    return serve_tf_examples_fn


def run_fn(fn_args):
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

    train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor,
                              tf_transform_output, constants.TRAIN_BATCH_SIZE)
    eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor,
                             tf_transform_output, constants.EVAL_BATCH_SIZE)

    # # Write logs to path
    # tensorboard_callback = tf.keras.callbacks.TensorBoard(
    #     log_dir=fn_args.model_run_dir, update_freq='batch')

    print(train_dataset)
    print("element_spec", train_dataset.element_spec)

    model = _build_model()
    model.fit(
        train_dataset,
        epochs=10,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps)

    signatures = {
        'serving_default':
            _get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function(
                tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
            ),
    }
    model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

when evaluating the model, below exception occurred:

IINFO:absl:Running executor for Evaluator
WARNING:absl:"maybe_add_baseline" and "maybe_remove_baseline" are deprecated,
        please use "has_baseline" instead.
INFO:absl:Request was made to ignore the baseline ModelSpec and any change thresholds. This is likely because a baseline model was not provided: updated_config=
model_specs {
  label_key: "tags"
}
slicing_specs {
}
metrics_specs {
  metrics {
    class_name: "ExampleCount"
    config: "{\"name\": \"example_count\"}"
  }
}
metrics_specs {
  metrics {
    class_name: "WeightedExampleCount"
    config: "{\"name\": \"weighted_example_count\"}"
  }
}
metrics_specs {
  metrics {
    class_name: "ExampleCount"
    config: "{\"name\": \"example_count\"}"
  }
}

INFO:absl:Using ./pipeline_output/Trainer/model/218/serving_model_dir as  model.
INFO:absl:The 'example_splits' parameter is not set, using 'eval' split.
INFO:absl:Evaluating model.
2020-12-04 18:46:17.286423: W tensorflow/core/framework/op_kernel.cc:1744] OP_REQUIRES failed at cast_op.cc:124 : Unimplemented: Cast string to float is not supported
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 865, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1006, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "apache_beam/runners/worker/operations.py", line 1035, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow_model_analysis/evaluators/metrics_plots_and_validations_evaluator.py", line 341, in add_input
    result = c.add_input(a, get_combiner_input(elements[0], i))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow_model_analysis/metrics/tf_metric_wrapper.py", line 630, in add_input
    self._process_batch(accumulator)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow_model_analysis/metrics/tf_metric_wrapper.py", line 594, in _process_batch
    metric.update_state(*inputs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/keras/utils/metrics_utils.py", line 90, in decorated
    update_op = update_state_fn(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/keras/metrics.py", line 176, in update_state_fn
    return ag_update_state(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/autograph/impl/api.py", line 255, in wrapper
    return converted_call(f, args, kwargs, options=options)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/autograph/impl/api.py", line 457, in converted_call
    return _call_unconverted(f, args, kwargs, options, False)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/autograph/impl/api.py", line 339, in _call_unconverted
    return f(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/keras/metrics.py", line 603, in update_state
    y_true = math_ops.cast(y_true, self._dtype)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/util/dispatch.py", line 201, in wrapper
    return target(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/ops/math_ops.py", line 922, in cast
    x = gen_math_ops.cast(x, base_type, name=name)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/ops/gen_math_ops.py", line 1858, in cast
    _ops.raise_from_not_ok_status(e, name)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/framework/ops.py", line 6843, in raise_from_not_ok_status
    six.raise_from(core._status_to_exception(e.code, message), None)
  File "<string>", line 3, in raise_from
tensorflow.python.framework.errors_impl.UnimplementedError: Cast string to float is not supported [Op:Cast]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "beam_dag_runner.py", line 30, in <module>
    run_pipeline()
  File "beam_dag_runner.py", line 24, in run_pipeline
    metadata_connection_config=metadata.sqlite_metadata_connection_config(METADATA_PATH)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 165, in run
    absl.logging.info('Component %s is scheduled.', component_id)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/pipeline.py", line 568, in __exit__
    self.result = self.run()
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/pipeline.py", line 547, in run
    return self.runner.run_pipeline(self, self._options)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 176, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 186, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 346, in run_stages
    bundle_context_manager,
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 534, in _run_stage
    bundle_manager)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 572, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 852, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 353, in push
    response = self.worker.do_instruction(request)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 484, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 985, in process_bundle
    element.data)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 354, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 867, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 84, in process
    self._run_component()
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 88, in _run_component
    self._component_launcher.launch()
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 209, in launch
    copy.deepcopy(execution_decision.exec_properties))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py", line 72, in _run_executor
    copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tfx/components/evaluator/executor.py", line 259, in Do
    tensor_adapter_config=tensor_adapter_config))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/pipeline.py", line 568, in __exit__
    self.result = self.run()
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/pipeline.py", line 547, in run
    return self.runner.run_pipeline(self, self._options)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 176, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 186, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 346, in run_stages
    bundle_context_manager,
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 534, in _run_stage
    bundle_manager)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 572, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 852, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 353, in push
    response = self.worker.do_instruction(request)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 484, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 985, in process_bundle
    element.data)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 354, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 809, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/worker/operations.py", line 818, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/common.py", line 1221, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
  File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 860, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1069, in apache_beam.runners.worker.operations.FlattenOperation.process
  File "apache_beam/runners/worker/operations.py", line 1072, in apache_beam.runners.worker.operations.FlattenOperation.process
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1069, in apache_beam.runners.worker.operations.FlattenOperation.process
  File "apache_beam/runners/worker/operations.py", line 1072, in apache_beam.runners.worker.operations.FlattenOperation.process
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 865, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 155, in apache_beam.runners.worker.operations.ConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1376, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 865, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1006, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "apache_beam/runners/worker/operations.py", line 1035, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow_model_analysis/evaluators/metrics_plots_and_validations_evaluator.py", line 341, in add_input
    result = c.add_input(a, get_combiner_input(elements[0], i))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow_model_analysis/metrics/tf_metric_wrapper.py", line 630, in add_input
    self._process_batch(accumulator)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow_model_analysis/metrics/tf_metric_wrapper.py", line 594, in _process_batch
    metric.update_state(*inputs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/keras/utils/metrics_utils.py", line 90, in decorated
    update_op = update_state_fn(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/keras/metrics.py", line 176, in update_state_fn
    return ag_update_state(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/autograph/impl/api.py", line 255, in wrapper
    return converted_call(f, args, kwargs, options=options)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/autograph/impl/api.py", line 457, in converted_call
    return _call_unconverted(f, args, kwargs, options, False)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/autograph/impl/api.py", line 339, in _call_unconverted
    return f(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/keras/metrics.py", line 603, in update_state
    y_true = math_ops.cast(y_true, self._dtype)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/util/dispatch.py", line 201, in wrapper
    return target(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/ops/math_ops.py", line 922, in cast
    x = gen_math_ops.cast(x, base_type, name=name)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/ops/gen_math_ops.py", line 1858, in cast
    _ops.raise_from_not_ok_status(e, name)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/framework/ops.py", line 6843, in raise_from_not_ok_status
    six.raise_from(core._status_to_exception(e.code, message), None)
  File "<string>", line 3, in raise_from
RuntimeError: tensorflow.python.framework.errors_impl.UnimplementedError: Cast string to float is not supported [Op:Cast] [while running 'ExtractEvaluateAndWriteResults/ExtractAndEvaluate/EvaluateMetricsAndPlots/ComputeMetricsAndPlots()/ComputePerSlice/ComputeUnsampledMetrics/CombinePerSliceKey/WindowIntoDiscarding']

debug infomation: image

i had read all tutorials & guide, and still have no idea how to resolve this problem.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 36 (16 by maintainers)

Most upvoted comments

Having a shared serving function is no longer recommended. Instead, create a separate signature function for just the preprocessing and then update the EvalConfig to specify this function under preprocessing_function_names leaving the main signature function for inference.

For example:

  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function(input_signature=[
      tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
  ])
  def serve_tf_examples_fn(serialized_tf_example):
    """Returns the output to be used in the serving signature."""
    raw_feature_spec = tf_transform_output.raw_feature_spec()
    # Remove label feature since these will not be present at serving time.
    raw_feature_spec.pop(_LABEL_KEY)
    raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
    transformed_features = model.tft_layer(raw_features)

    return model(transformed_features)

  @tf.function(input_signature=[
      tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
  ])
  def transform_features_fn(serialized_tf_example):
    """Returns the transformed_features to be fed as input to evaluator."""
    raw_feature_spec = tf_transform_output.raw_feature_spec()
    raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
    transformed_features = model.tft_layer(raw_features)
    return transformed_features

  return {
      'serving_default': serve_tf_examples_fn,
      'transform_features': transform_features_fn
  }

Then update the EvalConfig:

eval_config = tfma.EvalConfig(
      model_specs=[
          tfma.ModelSpec(
              signature_name='serving_default',
              preprocessing_function_names=['transform_features'])
              label_key='...')
      ],
      ...)

This is the current workaround for having it work both with inference and with TFMA:

def _get_serve_tf_examples_fn(model, tf_transform_output):
    """Returns a function that parses a serialized tf.Example and applies TFT."""

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        """Returns the output to be used in the serving signature."""
        feature_spec = tf_transform_output.raw_feature_spec()
        if not model.tft_layer.built:
            # Temporary workaround: Need to call the tft_layer with the label so that
            # it will be included in the layer's input_spec. This is needed so that
            # TFMA can call tft_layer with labels. However, the actual call for
            # inference is done without the label.
            parsed_features_with_label = tf.io.parse_example(
                serialized_tf_examples, feature_spec)
            _ = model.tft_layer(parsed_features_with_label)
        feature_spec.pop(_LABEL_KEY)
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        return model(transformed_features)

    return serve_tf_examples_fn

Then in the TFMA config use “tft_layer” for the preprocessing function:

eval_config = tfma.EvalConfig(
    # NOTE the use of ["tft_layer"] vs "tft_layer"
    model_specs=[tfma.ModelSpec(signature_name="serving_default", label_key="label_xf", preprocessing_function_names=["tft_layer"])],
    ...
)```

Hi @mdreves, Thank you so much for the fast reply! Really amazing. I hope the TFT support by TFMA makes in the next release. If not, I am happy to test it via the nightly versions.

Thank you for all your work and support, Hannes

TFT transformations are not yet supported. My hope is that they will be in the next TFMA release, but there is still an outstanding issue to resolve so it might not make it in before the cut off.