transform: Error in graph analyzer when using universal sentence encoder from tf_hub as per tutorial

I am getting an error running the following code in direct runner code:

import tensorflow as tf
import apache_beam as beam
import tensorflow_transform.beam as tft_beam
import tensorflow_transform.coders as tft_coders
from apache_beam.options.pipeline_options import PipelineOptions
import tempfile

model = None

def embed_text(text):
    import tensorflow_hub as hub
    global model
    if model is None:
        model = hub.load(
            'https://tfhub.dev/google/universal-sentence-encoder/4')
    embedding = model(text)
    return embedding


def get_metadata():
    from tensorflow_transform.tf_metadata import dataset_schema
    from tensorflow_transform.tf_metadata import dataset_metadata

    metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema({
        'id': dataset_schema.ColumnSchema(
            tf.string, [], dataset_schema.FixedColumnRepresentation()),
        'text': dataset_schema.ColumnSchema(
            tf.string, [], dataset_schema.FixedColumnRepresentation())
    }))
    return metadata


def preprocess_fn(input_features):

    text_integerized = embed_text(input_features['text'])
    output_features = {
        'id': input_features['id'],
        'embedding': text_integerized
    }
    return output_features


def run(pipeline_options, known_args):
    argv = None  # if None, uses sys.argv
    pipeline_options = PipelineOptions(argv)

    pipeline = beam.Pipeline(options=pipeline_options)
    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
        articles = (
                pipeline
                | beam.Create([
            {'id':'01','text':'To be, or not to be: that is the question: '},
            {'id':'02','text':"Whether 'tis nobler in the mind to suffer "},
            {'id':'03','text':'The slings and arrows of outrageous fortune, '},
            {'id':'04','text':'Or to take arms against a sea of troubles, '},
        ]))

        articles_dataset = (articles, get_metadata())

        transformed_dataset, transform_fn = (
                articles_dataset
                | 'Extract embeddings' >> tft_beam.AnalyzeAndTransformDataset(preprocess_fn)
        )

        transformed_data, transformed_metadata = transformed_dataset

        _ = (
            transformed_data | 'Write embeddings to TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
            file_path_prefix='{0}'.format(known_args.output_dir),
            file_name_suffix='.tfrecords',
            coder=tft_coders.example_proto_coder.ExampleProtoCoder(
                transformed_metadata.schema),
            num_shards=1
            )
            )
    result = pipeline.run()
    result.wait_until_finished()

called with:

import argparse
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

from etl import pipeline_local_minimal as pipeline


def get_args(argv):

  parser = argparse.ArgumentParser()

  parser.add_argument('--output_dir',
                      help='A directory location of output embeddings')

  parser.add_argument('--enable_debug',
                      action='store_true',
                      help='Enable debug options.')

  parser.add_argument('--debug_output_prefix',
                      help='Specify prefix of debug output.')

  parser.add_argument('--transform_temp_dir',
                      default='tft_temp',
                      help='A temp directory used by tf.transform.')

  parser.add_argument('--transform_export_dir',
                      default='tft_out',
                      help='A directory where tft function is saved')

  known_args, pipeline_args = parser.parse_known_args(argv)
  return known_args, pipeline_args


def main(argv=None):
  known_args, pipeline_args = get_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  setup_options = pipeline_options.view_as(SetupOptions)
  setup_options.save_main_session = True
  pipeline.run(pipeline_options, known_args)


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.ERROR)
  main()

and

# Configurable parameters
ROOT_DIR="./data"

# Datastore parameters
KIND="wikipedia"

# Directory for output data files
OUTPUT_PREFIX="${ROOT_DIR}/${KIND}/embeddings/embed"

# Working directories for Dataflow
DF_JOB_DIR="${ROOT_DIR}/${KIND}/dataflow"
STAGING_LOCATION="${DF_JOB_DIR}/staging"
TEMP_LOCATION="${DF_JOB_DIR}/temp"

# Working directories for tf.transform
TRANSFORM_ROOT_DIR="${DF_JOB_DIR}/transform"
TRANSFORM_TEMP_DIR="${TRANSFORM_ROOT_DIR}/temp"
TRANSFORM_EXPORT_DIR="${TRANSFORM_ROOT_DIR}/export"

# Working directories for Debug log
DEBUG_OUTPUT_PREFIX="${DF_JOB_DIR}/debug/log"

# Running Config for Dataflow
RUNNER=DirectRunner

# Cleaning working and oputput directories before running the Dataflow job
#echo "Cleaning working and output directories..."
#rm -r "${DF_JOB_DIR}"
#rm -r "${OUTPUT_PREFIX}"

echo "Running the Dataflow job..."

# Command to run the Dataflow job
python run_local.py \
  --output_dir="${OUTPUT_PREFIX}" \
  --transform_temp_dir="${TRANSFORM_TEMP_DIR}" \
  --transform_export_dir="${TRANSFORM_EXPORT_DIR}" \
  --runner="${RUNNER}" \
  --kind="${KIND}" \
  --staging_location="${STAGING_LOCATION}" \
  --temp_location="${TEMP_LOCATION}" \
  --setup_file=$(pwd)/setup.py \
  --enable_debug \
  --debug_output_prefix="${DEBUG_OUTPUT_PREFIX}"

echo "Dataflow job submitted successfully!"

error:

...
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 655, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 338, in process
    lambda: self._make_graph_state(saved_model_dir))
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tfx_bsl/beam/shared.py", line 221, in acquire
    return _shared_map.acquire(self._key, constructor_fn)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tfx_bsl/beam/shared.py", line 184, in acquire
    result = control_block.acquire(constructor_fn)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tfx_bsl/beam/shared.py", line 87, in acquire
    result = constructor_fn()
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 338, in <lambda>
    lambda: self._make_graph_state(saved_model_dir))
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 314, in _make_graph_state
    self._exclude_outputs, self._tf_config)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 232, in __init__
    tensor_inputs = graph_tools.get_dependent_inputs(graph, inputs, fetches)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/graph_tools.py", line 690, in get_dependent_inputs
    sink_tensors_ready)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/graph_tools.py", line 503, in __init__
    table_init_op, graph_analyzer_for_table_init, translate_path_fn)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/graph_tools.py", line 564, in _get_table_init_op_source_info
    if table_init_op.type not in _TABLE_INIT_OP_TYPES:
AttributeError: 'Tensor' object has no attribute 'type'

It looks like the graph analyzer is expecting a list of ops with a type attribute but is being passed a tensor instead. It is unclear to me what is going wrong here. Any help would be greatly appreciated!

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 16 (5 by maintainers)

Most upvoted comments

We’re having exact same issue, both locally and in Dataflow.

Could you let me know what version of Transform/TFX you are using? A recent commit should have fixed this. The commit is in the Transform 0.23 release and should be in the TFX 0.23 release.

Thanks!