pangeo-forge-recipes: `OSError: {'We encountered an internal error. Please try again.'}` during `finalize_target`.
A pretty significant error consistently occurs for datasets of a certain scale (>1 TiB) during the finalize_target step of the XarrayZarrRecipe. In the finalize step, during dimension consolidation, we try to overwrite a Zarr dimension. This triggers the underlying filesystem implementation, here gcsfs to remove all the files in that operation. During a remove of the directory, we hit an internal OS Error.
So far, I’ve investigated a few possible causes. I thought this could be a Beam-specific race condition (I tried to fix it here). However, when I reproduced the issue on a single worker (via running the pure python finalize_target step on a single VM), I was able to reproduce the issue.
I’ll add more context to this issue later today, but this is a good-enough summary of what I’m experiencing.
Full trace
Error message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/fsspec/mapping.py", line 167, in __delitem__
self.fs.rm(self._key_to_str(key))
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 111, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 96, in sync
raise return_result
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 53, in _runner
result[0] = await coro
File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 987, in _rm
raise exs[0]
File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 952, in _rm_files
raise OSError(out)
OSError: {'We encountered an internal error. Please try again.'}
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 983, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/opt/homebrew/Caskroom/miniconda/base/envs/era5/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1843, in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
File "/usr/local/lib/python3.8/site-packages/pangeo_forge_recipes/executors/beam.py", line 14, in _no_arg_stage
fun(config=config)
File "/usr/local/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py", line 659, in finalize_target
new = group.array(
File "/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py", line 1091, in array
return self._write_op(self._array_nosync, name, data, **kwargs)
File "/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py", line 800, in _write_op
return f(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py", line 1097, in _array_nosync
return array(data, store=self._store, path=path, chunk_store=self._chunk_store,
File "/usr/local/lib/python3.8/site-packages/zarr/creation.py", line 377, in array
z = create(**kwargs)
File "/usr/local/lib/python3.8/site-packages/zarr/creation.py", line 161, in create
init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor,
File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 419, in init_array
_init_array_metadata(store, shape=shape, chunks=chunks, dtype=dtype,
File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 451, in _init_array_metadata
rmdir(store, path)
File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 181, in rmdir
store.rmdir(path) # type: ignore
File "/usr/local/lib/python3.8/site-packages/zarr/_storage/store.py", line 154, in rmdir
_rmdir_from_keys(self, path)
File "/usr/local/lib/python3.8/site-packages/zarr/_storage/store.py", line 387, in _rmdir_from_keys
del store[key]
File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 723, in __delitem__
del self._mutable_mapping[key]
File "/usr/local/lib/python3.8/site-packages/fsspec/mapping.py", line 169, in __delitem__
raise KeyError
KeyError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
response = task()
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 983, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/opt/homebrew/Caskroom/miniconda/base/envs/era5/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1843, in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
File "/usr/local/lib/python3.8/site-packages/pangeo_forge_recipes/executors/beam.py", line 14, in _no_arg_stage
fun(config=config)
File "/usr/local/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py", line 659, in finalize_target
new = group.array(
File "/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py", line 1091, in array
return self._write_op(self._array_nosync, name, data, **kwargs)
File "/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py", line 800, in _write_op
return f(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py", line 1097, in _array_nosync
return array(data, store=self._store, path=path, chunk_store=self._chunk_store,
File "/usr/local/lib/python3.8/site-packages/zarr/creation.py", line 377, in array
z = create(**kwargs)
File "/usr/local/lib/python3.8/site-packages/zarr/creation.py", line 161, in create
init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor,
File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 419, in init_array
_init_array_metadata(store, shape=shape, chunks=chunks, dtype=dtype,
File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 451, in _init_array_metadata
rmdir(store, path)
File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 181, in rmdir
store.rmdir(path) # type: ignore
File "/usr/local/lib/python3.8/site-packages/zarr/_storage/store.py", line 154, in rmdir
_rmdir_from_keys(self, path)
File "/usr/local/lib/python3.8/site-packages/zarr/_storage/store.py", line 387, in _rmdir_from_keys
del store[key]
File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 723, in __delitem__
del self._mutable_mapping[key]
File "/usr/local/lib/python3.8/site-packages/fsspec/mapping.py", line 169, in __delitem__
raise KeyError
RuntimeError: KeyError [while running 'Start|cache_input|Reshuffle_000|prepare_target|Reshuffle_001|store_chunk|Reshuffle_002|finalize_target|Reshuffle_003/finalize_target-ptransform-56']
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 16 (16 by maintainers)
It actually calls
rmdir(path, recursive=True), which does delete everything.Zarr has FSStore, which is a thin layer over fsspec, and has the rmdir implementation. That’s what should be passed. If you pass a URL to zarr rather than an instantiated mapper, you will get an FSStore.