tfx: Importing InteractiveContext or ml-metadata breaks TFDV in TFX when imported before other packages
Importing InteractiveContext with a certain import order breaks TFDV.
It has been difficult to narrow down the issue, but it seems to be an issue with the module containing InteractiveContext in TFX and not an issue with TFDV.
Below are two code blocks. In the first case, tf is imported before InteractiveContext. This works as expected. In the second case, InteractiveContext is imported before tf or tfdv. This results in an error. When using a notebook, further work in the notebook crashes its kernel.
This issue appeared in TFX 0.23.
import tensorflow as tf
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
import tensorflow_data_validation as tfdv
with tf.io.TFRecordWriter('data') as writer:
example_proto = tf.train.Example(features=tf.train.Features(feature={
'feature': tf.train.Feature(int64_list=tf.train.Int64List(value=[0]))
}))
writer.write(example_proto.SerializeToString())
tfdv.generate_statistics_from_tfrecord('./data*')
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
WARNING:tensorflow:From /Users/lars.schoning/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/utils/stats_util.py:229: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version.
Instructions for updating:
Use eager execution and:
`tf.data.TFRecordDataset(path)`
datasets {
num_examples: 1
features {
num_stats {
...
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
import tensorflow as tf
context = InteractiveContext()
import tensorflow_data_validation as tfdv
with tf.io.TFRecordWriter('data') as writer:
example_proto = tf.train.Example(features=tf.train.Features(feature={
'feature': tf.train.Feature(int64_list=tf.train.Int64List(value=[0]))
}))
writer.write(example_proto.SerializeToString())
tfdv.generate_statistics_from_tfrecord('./data*')
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
---------------------------------------------------------------------------
NotFoundError Traceback (most recent call last)
<ipython-input-4-c5945afe5b0e> in <module>
5 writer.write(example_proto.SerializeToString())
6
----> 7 tfdv.generate_statistics_from_tfrecord('./data*')
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/utils/stats_gen_lib.py in generate_statistics_from_tfrecord(data_location, output_path, stats_options, pipeline_options, compression_type)
109 | 'GenerateStatistics' >> stats_api.GenerateStatistics(stats_options)
110 | 'WriteStatsOutput' >> stats_api.WriteStatisticsToTFRecord(
--> 111 output_path))
112 return stats_util.load_statistics(output_path)
113
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
553 try:
554 if not exc_type:
--> 555 self.result = self.run()
556 self.result.wait_until_finish()
557 finally:
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
532 finally:
533 shutil.rmtree(tmpdir)
--> 534 return self.runner.run_pipeline(self, self._options)
535 finally:
536 shutil.rmtree(self.local_tempdir, ignore_errors=True)
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
117 runner = BundleBasedDirectRunner()
118
--> 119 return runner.run_pipeline(pipeline, options)
120
121
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
174
175 self._latest_run_result = self.run_via_runner_api(
--> 176 pipeline.to_runner_api(default_environment=self._default_environment))
177 return self._latest_run_result
178
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
184 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
185 # the teststream (if any), and all the stages).
--> 186 return self.run_stages(stage_context, stages)
187
188 @contextlib.contextmanager
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
342 stage_results = self._run_stage(
343 runner_execution_context,
--> 344 bundle_context_manager,
345 )
346 monitoring_infos_by_stage[stage.name] = (
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
521 input_timers,
522 expected_timer_output,
--> 523 bundle_manager)
524
525 final_result = merge_results(last_result)
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
559
560 result, splits = bundle_manager.process_bundle(
--> 561 data_input, data_output, input_timers, expected_timer_output)
562 # Now we collect all the deferred inputs remaining from bundle execution.
563 # Deferred inputs can be:
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
943 with thread_pool_executor.shared_unbounded_instance() as executor:
944 for result, split_result in executor.map(execute, zip(part_inputs, # pylint: disable=zip-builtin-not-iterating
--> 945 timer_inputs)):
946 split_result_list += split_result
947 if merged_result is None:
~/.pyenv/versions/3.7.7/lib/python3.7/concurrent/futures/_base.py in result_iterator()
596 # Careful not to keep a reference to the popped future
597 if timeout is None:
--> 598 yield fs.pop().result()
599 else:
600 yield fs.pop().result(end_time - time.monotonic())
~/.pyenv/versions/3.7.7/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
433 raise CancelledError()
434 elif self._state == FINISHED:
--> 435 return self.__get_result()
436 else:
437 raise TimeoutError()
~/.pyenv/versions/3.7.7/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
382 def __get_result(self):
383 if self._exception:
--> 384 raise self._exception
385 else:
386 return self._result
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/utils/thread_pool_executor.py in run(self)
42 # If the future wasn't cancelled, then attempt to execute it.
43 try:
---> 44 self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
45 except BaseException as exc:
46 # Even though Python 2 futures library has #set_exection(),
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in execute(part_map_input_timers)
939 input_timers,
940 expected_output_timers,
--> 941 dry_run)
942
943 with thread_pool_executor.shared_unbounded_instance() as executor:
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
839 process_bundle_descriptor.id,
840 cache_tokens=[next(self._cache_token_generator)]))
--> 841 result_future = self._worker_handler.control_conn.push(process_bundle_req)
842
843 split_results = [] # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
351 self._uid_counter += 1
352 request.instruction_id = 'control_%s' % self._uid_counter
--> 353 response = self.worker.do_instruction(request)
354 return ControlFuture(request.instruction_id, response)
355
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
481 # E.g. if register is set, this will call self.register(request.register))
482 return getattr(self, request_type)(
--> 483 getattr(request, request_type), request.instruction_id)
484 else:
485 raise NotImplementedError
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
516 with self.maybe_profile(instruction_id):
517 delayed_applications, requests_finalization = (
--> 518 bundle_processor.process_bundle(instruction_id))
519 monitoring_infos = bundle_processor.monitoring_infos()
520 monitoring_infos.extend(self.state_cache_metrics_fn())
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
986 for op in self.ops.values():
987 _LOGGER.debug('finish %s', op)
--> 988 op.finish()
989
990 # Close every timer output stream
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.PGBKCVOperation.finish()
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.PGBKCVOperation.finish()
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.PGBKCVOperation.output_key()
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in compact(self, accumulator)
729 accumulator: _CombinerStatsGeneratorsCombineFnAcc
730 ) -> _CombinerStatsGeneratorsCombineFnAcc:
--> 731 self._maybe_do_batch(accumulator, force=True)
732 self._num_compacts.inc(1)
733 return accumulator
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in _maybe_do_batch(self, accumulator, force)
670 accumulator.partial_accumulators = self._for_each_generator(
671 lambda gen, gen_acc: gen.add_input(gen_acc, record_batch),
--> 672 accumulator.partial_accumulators)
673 del accumulator.input_record_batches[:]
674 accumulator.curr_batch_size = 0
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in _for_each_generator(self, func, *args)
624 """
625 return [func(gen, *args_for_func) for gen, args_for_func in zip(
--> 626 self._generators, zip(*args))]
627
628 def create_accumulator(self
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in <listcomp>(.0)
623 func(self._generators[i], args[0][i], args[1][i], ...).
624 """
--> 625 return [func(gen, *args_for_func) for gen, args_for_func in zip(
626 self._generators, zip(*args))]
627
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in <lambda>(gen, gen_acc)
669 accumulator.input_record_batches)
670 accumulator.partial_accumulators = self._for_each_generator(
--> 671 lambda gen, gen_acc: gen.add_input(gen_acc, record_batch),
672 accumulator.partial_accumulators)
673 del accumulator.input_record_batches[:]
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py in add_input(self, accumulator, examples)
1051 # Store empty summary for each of the quantiles computations.
1052 stats_for_feature.numeric_stats.quantiles_summary = (
-> 1053 self._values_quantiles_combiner.create_accumulator())
1054 stats_for_feature.numeric_stats.weighted_quantiles_summary = (
1055 self._values_quantiles_combiner.create_accumulator())
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_data_validation/utils/quantiles_util.py in create_accumulator(self)
54
55 def create_accumulator(self) -> List[List[float]]:
---> 56 return self._quantiles_spec.create_accumulator()
57
58 def add_input(
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py in create_accumulator(self)
2028
2029 def create_accumulator(self):
-> 2030 graph_state = self._get_graph_state()
2031 return graph_state.empty_summary
2032
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py in _get_graph_state(self)
2024 random_slot=random_slot)
2025 self._graph_state = _QuantilesGraphStateProvider.get_graph_state(
-> 2026 graph_state_options)
2027 return self._graph_state
2028
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py in get_graph_state(cls, graph_state_options)
2354 result = cls._graph_states_by_options.get(graph_state_options)
2355 if result is None:
-> 2356 result = _QuantilesGraphState(graph_state_options)
2357 cls._graph_states_by_options[graph_state_options] = result
2358 return result
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py in __init__(self, options)
2212 fetches=tf.raw_ops.BoostedTreesFlushQuantileSummaries(
2213 quantile_stream_resource_handle=self._resource,
-> 2214 num_features=options.num_features))
2215
2216 graph.finalize()
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow/python/client/session.py in make_callable(self, fetches, feed_list, accept_options)
1239 # returned object, because the arguments to `fetches` must already be
1240 # in the graph.
-> 1241 self._extend_graph()
1242
1243 # Create a fetch handler to take care of the structure of fetches.
~/.pyenv/versions/3.7.7/envs/beam-3.7/lib/python3.7/site-packages/tensorflow/python/client/session.py in _extend_graph(self)
1386 def _extend_graph(self):
1387 with self._graph._session_run_lock(): # pylint: disable=protected-access
-> 1388 tf_session.ExtendSession(self._session)
1389
1390 # The threshold to run garbage collection to delete dead tensors.
NotFoundError: No attr named 'BoostedTreesFlushQuantileSummaries' in NodeDef:
[[{{node quantiles_combiner}}]]
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Reactions: 2
- Comments: 15 (6 by maintainers)
Commits related to this issue
- Temporarily adds unnecessary import to mitigate weird import order issue(#2576). PiperOrigin-RevId: 341964697 — committed to tensorflow/tfx by deleted user 4 years ago
- Temporarily adds unnecessary import to mitigate weird import order issue(#2576). PiperOrigin-RevId: 341964697 — committed to tensorflow/tfx by deleted user 4 years ago
- Temporarily adds unnecessary import to mitigate weird import order issue(#2576). PiperOrigin-RevId: 341964697 — committed to tensorflow/tfx by deleted user 4 years ago
- Temporarily adds unnecessary import to mitigate weird import order issue(Github Issue: #2576). PiperOrigin-RevId: 341964697 — committed to tensorflow/tfx by deleted user 4 years ago
- Temporarily adds unnecessary import to mitigate weird import order issue(Github Issue: #2576). PiperOrigin-RevId: 342165996 — committed to tensorflow/tfx by deleted user 4 years ago
Seeing this same issue when running w TF 2.3 and TFX 0.25 in AirflowDagRunner and using DirectRunner. it gets fixed if I import tensorflow before any ml-metadata code.