tfx: Importing InteractiveContext or ml-metadata breaks TFDV in TFX when imported before other packages

Importing InteractiveContext with a certain import order breaks TFDV.

It has been difficult to narrow down the issue, but it seems to be an issue with the module containing InteractiveContext in TFX and not an issue with TFDV.

Below are two code blocks. In the first case, tf is imported before InteractiveContext. This works as expected. In the second case, InteractiveContext is imported before tf or tfdv. This results in an error. When using a notebook, further work in the notebook crashes its kernel.

This issue appeared in TFX 0.23.

import tensorflow as tf

from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

context = InteractiveContext()

import tensorflow_data_validation as tfdv

with tf.io.TFRecordWriter('data') as writer:
    example_proto = tf.train.Example(features=tf.train.Features(feature={
        'feature': tf.train.Feature(int64_list=tf.train.Int64List(value=[0]))
    }))
    writer.write(example_proto.SerializeToString())

tfdv.generate_statistics_from_tfrecord('./data*')

WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
WARNING:tensorflow:From /Users/lars.schoning/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/utils/stats_util.py:229: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version.
Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
datasets {
  num_examples: 1
  features {
    num_stats {
...
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

import tensorflow as tf

context = InteractiveContext()

import tensorflow_data_validation as tfdv

with tf.io.TFRecordWriter('data') as writer:
    example_proto = tf.train.Example(features=tf.train.Features(feature={
        'feature': tf.train.Feature(int64_list=tf.train.Int64List(value=[0]))
    }))
    writer.write(example_proto.SerializeToString())

tfdv.generate_statistics_from_tfrecord('./data*')
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
---------------------------------------------------------------------------
NotFoundError                             Traceback (most recent call last)
<ipython-input-4-c5945afe5b0e> in <module>
      5     writer.write(example_proto.SerializeToString())
      6 
----> 7 tfdv.generate_statistics_from_tfrecord('./data*')

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/utils/stats_gen_lib.py in generate_statistics_from_tfrecord(data_location, output_path, stats_options, pipeline_options, compression_type)
    109         | 'GenerateStatistics' >> stats_api.GenerateStatistics(stats_options)
    110         | 'WriteStatsOutput' >> stats_api.WriteStatisticsToTFRecord(
--> 111             output_path))
    112   return stats_util.load_statistics(output_path)
    113 

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
    553     try:
    554       if not exc_type:
--> 555         self.result = self.run()
    556         self.result.wait_until_finish()
    557     finally:

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    532         finally:
    533           shutil.rmtree(tmpdir)
--> 534       return self.runner.run_pipeline(self, self._options)
    535     finally:
    536       shutil.rmtree(self.local_tempdir, ignore_errors=True)

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
    117       runner = BundleBasedDirectRunner()
    118 
--> 119     return runner.run_pipeline(pipeline, options)
    120 
    121 

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
    174 
    175     self._latest_run_result = self.run_via_runner_api(
--> 176         pipeline.to_runner_api(default_environment=self._default_environment))
    177     return self._latest_run_result
    178 

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
    184     # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
    185     #   the teststream (if any), and all the stages).
--> 186     return self.run_stages(stage_context, stages)
    187 
    188   @contextlib.contextmanager

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
    342           stage_results = self._run_stage(
    343               runner_execution_context,
--> 344               bundle_context_manager,
    345           )
    346           monitoring_infos_by_stage[stage.name] = (

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
    521               input_timers,
    522               expected_timer_output,
--> 523               bundle_manager)
    524 
    525       final_result = merge_results(last_result)

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
    559 
    560     result, splits = bundle_manager.process_bundle(
--> 561         data_input, data_output, input_timers, expected_timer_output)
    562     # Now we collect all the deferred inputs remaining from bundle execution.
    563     # Deferred inputs can be:

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
    943     with thread_pool_executor.shared_unbounded_instance() as executor:
    944       for result, split_result in executor.map(execute, zip(part_inputs,  # pylint: disable=zip-builtin-not-iterating
--> 945                                                             timer_inputs)):
    946         split_result_list += split_result
    947         if merged_result is None:

~/.pyenv/versions/3.7.7/lib/python3.7/concurrent/futures/_base.py in result_iterator()
    596                     # Careful not to keep a reference to the popped future
    597                     if timeout is None:
--> 598                         yield fs.pop().result()
    599                     else:
    600                         yield fs.pop().result(end_time - time.monotonic())

~/.pyenv/versions/3.7.7/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    433                 raise CancelledError()
    434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
    436             else:
    437                 raise TimeoutError()

~/.pyenv/versions/3.7.7/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/utils/thread_pool_executor.py in run(self)
     42       # If the future wasn't cancelled, then attempt to execute it.
     43       try:
---> 44         self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
     45       except BaseException as exc:
     46         # Even though Python 2 futures library has #set_exection(),

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in execute(part_map_input_timers)
    939           input_timers,
    940           expected_output_timers,
--> 941           dry_run)
    942 
    943     with thread_pool_executor.shared_unbounded_instance() as executor:

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
    839             process_bundle_descriptor.id,
    840             cache_tokens=[next(self._cache_token_generator)]))
--> 841     result_future = self._worker_handler.control_conn.push(process_bundle_req)
    842 
    843     split_results = []  # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
    351       self._uid_counter += 1
    352       request.instruction_id = 'control_%s' % self._uid_counter
--> 353     response = self.worker.do_instruction(request)
    354     return ControlFuture(request.instruction_id, response)
    355 

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
    481       # E.g. if register is set, this will call self.register(request.register))
    482       return getattr(self, request_type)(
--> 483           getattr(request, request_type), request.instruction_id)
    484     else:
    485       raise NotImplementedError

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
    516         with self.maybe_profile(instruction_id):
    517           delayed_applications, requests_finalization = (
--> 518               bundle_processor.process_bundle(instruction_id))
    519           monitoring_infos = bundle_processor.monitoring_infos()
    520           monitoring_infos.extend(self.state_cache_metrics_fn())

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
    986       for op in self.ops.values():
    987         _LOGGER.debug('finish %s', op)
--> 988         op.finish()
    989 
    990       # Close every timer output stream

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.PGBKCVOperation.finish()

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.PGBKCVOperation.finish()

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.PGBKCVOperation.output_key()

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in compact(self, accumulator)
    729       accumulator: _CombinerStatsGeneratorsCombineFnAcc
    730       ) -> _CombinerStatsGeneratorsCombineFnAcc:
--> 731     self._maybe_do_batch(accumulator, force=True)
    732     self._num_compacts.inc(1)
    733     return accumulator

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in _maybe_do_batch(self, accumulator, force)
    670       accumulator.partial_accumulators = self._for_each_generator(
    671           lambda gen, gen_acc: gen.add_input(gen_acc, record_batch),
--> 672           accumulator.partial_accumulators)
    673       del accumulator.input_record_batches[:]
    674       accumulator.curr_batch_size = 0

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in _for_each_generator(self, func, *args)
    624     """
    625     return [func(gen, *args_for_func) for gen, args_for_func in zip(
--> 626         self._generators, zip(*args))]
    627 
    628   def create_accumulator(self

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in <listcomp>(.0)
    623       func(self._generators[i], args[0][i], args[1][i], ...).
    624     """
--> 625     return [func(gen, *args_for_func) for gen, args_for_func in zip(
    626         self._generators, zip(*args))]
    627 

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in <lambda>(gen, gen_acc)
    669             accumulator.input_record_batches)
    670       accumulator.partial_accumulators = self._for_each_generator(
--> 671           lambda gen, gen_acc: gen.add_input(gen_acc, record_batch),
    672           accumulator.partial_accumulators)
    673       del accumulator.input_record_batches[:]

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py in add_input(self, accumulator, examples)
   1051         # Store empty summary for each of the quantiles computations.
   1052         stats_for_feature.numeric_stats.quantiles_summary = (
-> 1053             self._values_quantiles_combiner.create_accumulator())
   1054         stats_for_feature.numeric_stats.weighted_quantiles_summary = (
   1055             self._values_quantiles_combiner.create_accumulator())

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/utils/quantiles_util.py in create_accumulator(self)
     54 
     55   def create_accumulator(self) -> List[List[float]]:
---> 56     return self._quantiles_spec.create_accumulator()
     57 
     58   def add_input(

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py in create_accumulator(self)
   2028 
   2029   def create_accumulator(self):
-> 2030     graph_state = self._get_graph_state()
   2031     return graph_state.empty_summary
   2032 

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py in _get_graph_state(self)
   2024           random_slot=random_slot)
   2025       self._graph_state = _QuantilesGraphStateProvider.get_graph_state(
-> 2026           graph_state_options)
   2027     return self._graph_state
   2028 

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py in get_graph_state(cls, graph_state_options)
   2354     result = cls._graph_states_by_options.get(graph_state_options)
   2355     if result is None:
-> 2356       result = _QuantilesGraphState(graph_state_options)
   2357       cls._graph_states_by_options[graph_state_options] = result
   2358     return result

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py in __init__(self, options)
   2212           fetches=tf.raw_ops.BoostedTreesFlushQuantileSummaries(
   2213               quantile_stream_resource_handle=self._resource,
-> 2214               num_features=options.num_features))
   2215 
   2216       graph.finalize()

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow/python/client/session.py in make_callable(self, fetches, feed_list, accept_options)
   1239     # returned object, because the arguments to `fetches` must already be
   1240     # in the graph.
-> 1241     self._extend_graph()
   1242 
   1243     # Create a fetch handler to take care of the structure of fetches.

~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow/python/client/session.py in _extend_graph(self)
   1386   def _extend_graph(self):
   1387     with self._graph._session_run_lock():  # pylint: disable=protected-access
-> 1388       tf_session.ExtendSession(self._session)
   1389 
   1390   # The threshold to run garbage collection to delete dead tensors.

NotFoundError: No attr named 'BoostedTreesFlushQuantileSummaries' in NodeDef:
	 [[{{node quantiles_combiner}}]]

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 2
  • Comments: 15 (6 by maintainers)

Commits related to this issue

Most upvoted comments

Seeing this same issue when running w TF 2.3 and TFX 0.25 in AirflowDagRunner and using DirectRunner. it gets fixed if I import tensorflow before any ml-metadata code.

 File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 686, in add_input
    self._maybe_do_batch(accumulator)
  File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 673, in _maybe_do_batch
    accumulator.partial_accumulators)
  File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 627, in _for_each_generator
    self._generators, zip(*args))]
  File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 626, in <listcomp>
    return [func(gen, *args_for_func) for gen, args_for_func in zip(
  File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 672, in <lambda>
    lambda gen, gen_acc: gen.add_input(gen_acc, record_batch),
  File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 1053, in add_input
    self._values_quantiles_combiner.create_accumulator())
  File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow_data_validation/utils/quantiles_util.py", line 56, in create_accumulator
    return self._quantiles_spec.create_accumulator()
  File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow_transform/analyzers.py", line 2033, in create_accumulator
    graph_state = self._get_graph_state()
  File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow_transform/analyzers.py", line 2029, in _get_graph_state
    graph_state_options)
  File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow_transform/analyzers.py", line 2359, in get_graph_state
    result = _QuantilesGraphState(graph_state_options)
  File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow_transform/analyzers.py", line 2217, in __init__
    num_features=options.num_features))
  File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow/python/client/session.py", line 1241, in make_callable
    self._extend_graph()
  File "/Users/gcasassaez/workspace/local_envs/airflow_env/env/lib/python3.7/site-packages/tensorflow/python/client/session.py", line 1388, in _extend_graph
    tf_session.ExtendSession(self._session)
RuntimeError: tensorflow.python.framework.errors_impl.NotFoundError: No attr named 'BoostedTreesFlushQuantileSummaries' in NodeDef:
	 [[{{node quantiles_combiner}}]] [while running 'GenerateStatistics[train]/RunStatsGenerators/GenerateSlicedStatisticsImpl/RunCombinerStatsGenerators/WindowIntoDiscarding']