tfx: Transform fails with S3 as backend storage

I’m running TFX in KubeFlow and now I’m trying to use an S3 backend, e.g. Minio.

ExampleGen, StatisticsGen and SchemaGen completes successfully but Transform fails. It seems to have finished all computations and has written the graph to a tmp dir in S3 but then it tries to copy it to the actual patch and fails.

In Minio I can see the output of the analyzer cache and the transform_tmp dir. transformed_examples is empty.

Below is the log output with the error:

INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267047   nanos: 216625690 } message: "Assets added to graph." instruction_id: "bundle_170" transform_id: "Analyze/CreateSavedModel[tf_compat_v1]/BindTensors/ReplaceWithConstants" log_location: "/usr/local/lib/python3.7/dist-packages/tensorflow/python/saved_model/builder_impl.py:666" thread: "Thread-12" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267047   nanos: 279367208 } message: "Assets written to: s3://pipelines/tfx/trace_pipeline_e2e/TransformMaster/transform_graph/1736/.temp_path/tftransform_tmp/3d458c0a7a00464f9632b2d467cdd963/assets" instruction_id: "bundle_170" transform_id: "Analyze/CreateSavedModel[tf_compat_v1]/BindTensors/ReplaceWithConstants" log_location: "/usr/local/lib/python3.7/dist-packages/tensorflow/python/saved_model/builder_impl.py:775" thread: "Thread-12" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267047   nanos: 331916809 } message: "SavedModel written to: s3://pipelines/tfx/trace_pipeline_e2e/TransformMaster/transform_graph/1736/.temp_path/tftransform_tmp/3d458c0a7a00464f9632b2d467cdd963/saved_model.pb" instruction_id: "bundle_170" transform_id: "Analyze/CreateSavedModel[tf_compat_v1]/BindTensors/ReplaceWithConstants" log_location: "/usr/local/lib/python3.7/dist-packages/tensorflow/python/saved_model/builder_impl.py:426" thread: "Thread-12" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: ERROR timestamp {   seconds: 1612267136   nanos: 111515760 } message: "Error processing instruction bundle_170. Original traceback is\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1239, in process\n    return self.do_fn_invoker.invoke_process(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 769, in invoke_process\n    windowed_value, additional_args, additional_kwargs)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 893, in _invoke_process_per_window\n    self.process_method(*args_for_process),\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/transforms/core.py\", line 1590, in <lambda>\n    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]\n  File \"/usr/local/lib/python3.7/dist-packages/tensorflow_transform/beam/tft_beam_io/transform_fn_io.py\", line 37, in _copy_tree_to_unique_temp_dir\n    _copy_tree(source, destination)\n  File \"/usr/local/lib/python3.7/dist-packages/tensorflow_transform/beam/tft_beam_io/transform_fn_io.py\", line 51, in _copy_tree\n    os.path.join(source, filename), os.path.join(destination, filename))\n  File \"/usr/local/lib/python3.7/dist-packages/tensorflow_transform/beam/tft_beam_io/transform_fn_io.py\", line 53, in _copy_tree\n    tf.io.gfile.copy(source, destination)\n  File \"/usr/local/lib/python3.7/dist-packages/tensorflow/python/lib/io/file_io.py\", line 513, in copy_v2\n    compat.as_bytes(src), compat.as_bytes(dst), overwrite)\ntensorflow.python.framework.errors_impl.AbortedError: All 10 retry attempts failed. The last failure: Unknown: : No response body.\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py\", line 289, in _execute\n    response = task()\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py\", line 362, in <lambda>\n    lambda: self.create_worker().do_instruction(request), request)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py\", line 607, in do_instruction\n    getattr(request, request_type), request.instruction_id)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py\", line 644, in process_bundle\n    bundle_processor.process_bundle(instruction_id))\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/bundle_processor.py\", line 1000, in process_bundle\n    element.data)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/bundle_processor.py\", line 228, in process_encoded\n    self.output(decoded_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/operations.py\", line 359, in output\n    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/operations.py\", line 221, in receive\n    self.consumer.process(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/operations.py\", line 719, in process\n    delayed_application = self.dofn_runner.process(o)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1241, in process\n    self._reraise_augmented(exn)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1239, in process\n    return self.do_fn_invoker.invoke_process(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 588, in invoke_process\n    windowed_value, self.process_method(windowed_value.value))\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1401, in process_outputs\n    self.main_receivers.receive(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/operations.py\", line 221, in receive\n    self.consumer.process(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/operations.py\", line 719, in process\n    delayed_application = self.dofn_runner.process(o)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1241, in process\n    self._reraise_augmented(exn)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1239, in process\n    return self.do_fn_invoker.invoke_process(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 588, in invoke_process\n    windowed_value, self.process_method(windowed_value.value))\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1401, in process_outputs\n    self.main_receivers.receive(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/operations.py\", line 221, in receive\n    self.consumer.process(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/operations.py\", line 719, in process\n    delayed_application = self.dofn_runner.process(o)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1241, in process\n    self._reraise_augmented(exn)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1239, in process\n    return self.do_fn_invoker.invoke_process(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 769, in invoke_process\n    windowed_value, additional_args, additional_kwargs)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 894, in _invoke_process_per_window\n    self.threadsafe_watermark_estimator)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1401, in process_outputs\n    self.main_receivers.receive(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/operations.py\", line 158, in receive\n    cython.cast(Operation, consumer).process(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/operations.py\", line 719, in process\n    delayed_application = self.dofn_runner.process(o)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1241, in process\n    self._reraise_augmented(exn)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1321, in _reraise_augmented\n    raise_with_traceback(new_exn)\n  File \"/usr/local/lib/python3.7/dist-packages/future/utils/__init__.py\", line 446, in raise_with_traceback\n    raise exc.with_traceback(traceback)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 1239, in process\n    return self.do_fn_invoker.invoke_process(windowed_value)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 769, in invoke_process\n    windowed_value, additional_args, additional_kwargs)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py\", line 893, in _invoke_process_per_window\n    self.process_method(*args_for_process),\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/transforms/core.py\", line 1590, in <lambda>\n    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]\n  File \"/usr/local/lib/python3.7/dist-packages/tensorflow_transform/beam/tft_beam_io/transform_fn_io.py\", line 37, in _copy_tree_to_unique_temp_dir\n    _copy_tree(source, destination)\n  File \"/usr/local/lib/python3.7/dist-packages/tensorflow_transform/beam/tft_beam_io/transform_fn_io.py\", line 51, in _copy_tree\n    os.path.join(source, filename), os.path.join(destination, filename))\n  File \"/usr/local/lib/python3.7/dist-packages/tensorflow_transform/beam/tft_beam_io/transform_fn_io.py\", line 53, in _copy_tree\n    tf.io.gfile.copy(source, destination)\n  File \"/usr/local/lib/python3.7/dist-packages/tensorflow/python/lib/io/file_io.py\", line 513, in copy_v2\n    compat.as_bytes(src), compat.as_bytes(dst), overwrite)\nRuntimeError: tensorflow.python.framework.errors_impl.AbortedError: All 10 retry attempts failed. The last failure: Unknown: : No response body. [while running \'WriteTransformFn/WriteTransformFnToTemp\']\n\n" instruction_id: "bundle_170" log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py:296" thread: "Thread-12" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267137   nanos: 21243095 } message: "No more requests from control plane" log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py:266" thread: "MainThread" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267137   nanos: 21665096 } message: "SDK Harness waiting for in-flight requests to complete" log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py:267" thread: "MainThread" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267137   nanos: 21919965 } message: "Closing all cached grpc data channels." log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/data_plane.py:721" thread: "MainThread" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267137   nanos: 22111654 } message: "Closing all cached gRPC state handlers." log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py:891" thread: "MainThread" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267137   nanos: 31976699 } message: "Done consuming work." log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py:279" thread: "MainThread" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267137   nanos: 32243013 } message: "Python sdk harness exiting." log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker_main.py:166" thread: "MainThread" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267137   nanos: 851266622 } message: "No more requests from control plane" log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py:266" thread: "MainThread" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267137   nanos: 851534366 } message: "SDK Harness waiting for in-flight requests to complete" log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py:267" thread: "MainThread" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267137   nanos: 851665258 } message: "Closing all cached grpc data channels." log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/data_plane.py:721" thread: "MainThread" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267137   nanos: 851786136 } message: "Closing all cached gRPC state handlers." log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py:891" thread: "MainThread" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267137   nanos: 859477281 } message: "Done consuming work." log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py:279" thread: "MainThread" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1612267137   nanos: 859626293 } message: "Python sdk harness exiting." log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker_main.py:166" thread: "MainThread" 
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py", line 1239, in process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py", line 769, in invoke_process
    windowed_value, additional_args, additional_kwargs)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/common.py", line 893, in _invoke_process_per_window
    self.process_method(*args_for_process),
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/transforms/core.py", line 1590, in <lambda>
    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
  File "/usr/local/lib/python3.7/dist-packages/tensorflow_transform/beam/tft_beam_io/transform_fn_io.py", line 37, in _copy_tree_to_unique_temp_dir
    _copy_tree(source, destination)
  File "/usr/local/lib/python3.7/dist-packages/tensorflow_transform/beam/tft_beam_io/transform_fn_io.py", line 51, in _copy_tree
    os.path.join(source, filename), os.path.join(destination, filename))
  File "/usr/local/lib/python3.7/dist-packages/tensorflow_transform/beam/tft_beam_io/transform_fn_io.py", line 53, in _copy_tree
    tf.io.gfile.copy(source, destination)
  File "/usr/local/lib/python3.7/dist-packages/tensorflow/python/lib/io/file_io.py", line 513, in copy_v2
    compat.as_bytes(src), compat.as_bytes(dst), overwrite)
tensorflow.python.framework.errors_impl.AbortedError: All 10 retry attempts failed. The last failure: Unknown: : No response body.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 22 (14 by maintainers)

Most upvoted comments

@arghyaganguly I’m using TFX 0.26.1 with it’s dependencies.

@Bobgy @PatrickXYS TFX now supports a recent enough beam version to be able to run on S3 and other components, e.g. ExampleGen, SchemaGen, StatisticsGen, all work as expected. The issue is solemnly with Transform, hence it is likely a TFX issue and not Kubeflow or KFP.

The actual call failing is tf.io.gfile.copy(source, destination). I think someone from TFX should look at this.