pangeo-forge-recipes: Xarray-to-Zarr recipe runs out of memory

I’m following the PangeoForge recipe tutorial Xarray-to-Zarr Sequential Recipe: NOAA OISST to create a recipe for CEDA monthly daytime land surface temperature data, but I’m running into issues using pangeo-forge-recipes version 0.10.0 (obtained via conda-forge).

Here’s my code in recipe.py (I’m using python 3.11):

import os
from tempfile import TemporaryDirectory

import apache_beam as beam
import pandas as pd
import xarray as xr
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
    OpenURLWithFSSpec,
    OpenWithXarray,
    StoreToZarr,
)

url_pattern = (
    "https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
    "MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
    "ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
)
months = pd.date_range("1995-08", "2020-12", freq=pd.offsets.MonthBegin())
urls = tuple(url_pattern.format(time=month) for month in months)
# Prune to 1 element to minimize memory reqs for now
pattern = pattern_from_file_sequence(urls, "time", nitems_per_file=1).prune(1)

temp_dir = TemporaryDirectory()
target_root = temp_dir.name
store_name = "output.zarr"
target_store = os.path.join(target_root, store_name)

transforms = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec()
    | OpenWithXarray(file_type=pattern.file_type)
    | StoreToZarr(
        target_root=target_root,
        store_name=store_name,
        combine_dims=pattern.combine_dim_keys,
        target_chunks={"time": 1, "lat": 5, "lon": 5},
    )
)

print(f"{pattern=}")
print(f"{target_store=}")
print(f"{transforms=}")

with beam.Pipeline() as p:
    p | transforms  # type: ignore[reportUnusedExpression]

with xr.open_zarr(target_store) as ds:
    print(ds)

When I run this, it is eventually killed because it consumes an obscene amount of memory. I saw the python process exceed 40G of memory (on my 16G machine), but it may very well have gone beyond that while I wasn’t watching it – it ran for about 3.5 hours!:

$ time python recipe.py
pattern=<FilePattern {'time': 1}>
target_store='/var/folders/v_/q9ql2x2n3dlg2td_b6xkcjzw0000gn/T/tmpozgcr3ng/output.zarr'
transforms=<_ChainedPTransform(PTransform) label=[Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr] at 0x162819b90>
...
.../python3.11/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
Killed: 9

real    216m31.108s
user    76m14.794s
sys     90m21.965s
.../python3.11/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '

I’m going to downgrade pangeo-forge-recipes to a version prior to the recently introduced breaking API changes to see if I encounter the same problem with the old API, but in the meantime, is there anything glaringly wrong with what I’ve written above that would cause the memory issue?

About this issue

  • Original URL
  • State: open
  • Created 10 months ago
  • Reactions: 1
  • Comments: 15 (15 by maintainers)

Most upvoted comments

Everything is float32, apart from the time values, which are datetime64[ns]. Did you use 8 bytes in the chunk size computation because you were thinking the data are float64, not float32?

Ok I see, yes this is definitely the case for the original file. For my recipe however, the target dataset was all float64. That’s not good. 😕

I finally got a successful run. I used target_chunks={"lat": 3600, "lon": 7200} and it took ~30 min. to complete.

I now see that the result is float64 that you mentioned, even though the original is float32. Is there any way for us to prevent this unwanted conversion?

I keep getting either aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed or fsspec.exceptions.FSTimeoutError. Perhaps fiddling with the chunk size will get me past them.

Do you know if this timeout is coming from the upstream CEDA server?

I suspect the problem may be on the CEDA server side. While using the CEDA OPeNDAP server during the course of some experimental work (which is what led me to look at writing this recipe, on the advice of @sharkinsspatial), we’ve experienced a great deal of flakiness.

This is the problem

        target_chunks={"time": 1, "lat": 5, "lon": 5},

You are trying to decimate this very large dataset into a tiny, tiny number of chunks. This is overwhelming beam with a vast number of tasks.

The original dataset dimensions are {'time': 1, 'lat': 18000, 'lon': 36000, 'length_scale': 1, 'channel': 2}. So this pipeline is creating 25920000 different tasks (one for each chunk).

Beyond the inefficient pipeline, this Zarr with these tiny chunks would be extremely hard to use. We generally aim for chunks from 1-100 MB. The following chunks seemed to work for me.

        target_chunks={"lat": 1800, "lon": 1800},

However, then I hit a new error

IndexError: too many indices for array; expected 1, got 4

which I swear I have seen before but can’t remember where.

/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/pandas/core/arrays/timedeltas.py:908: RuntimeWarning: invalid value encountered in cast
  base = data.astype(np.int64)
/srv/conda/envs/notebook/lib/python3.10/site-packages/pandas/core/arrays/timedeltas.py:912: RuntimeWarning: invalid value encountered in cast
  data = (base * m + (frac * m).astype(np.int64)).view("timedelta64[ns]")
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/coding/times.py:618: RuntimeWarning: invalid value encountered in cast
  int_num = np.asarray(num, dtype=np.int64)
Unexpected exception formatting exception. Falling back to standard exception
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 839, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 985, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/transforms/core.py", line -1, in <lambda>
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 90, in store_dataset_fragment
    _store_data(vname, da.variable, index, zgroup)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 50, in _store_data
    zarr_array[region] = data
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1497, in __setitem__
    self.set_basic_selection(pure_selection, value, fields=fields)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1593, in set_basic_selection
    return self._set_basic_selection_nd(selection, value, fields=fields)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1981, in _set_basic_selection_nd
    indexer = BasicIndexer(selection, self)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 335, in __init__
    selection = replace_ellipsis(selection, array._shape)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 272, in replace_ellipsis
    check_selection_length(selection, shape)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 236, in check_selection_length
    err_too_many_indices(selection, shape)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/errors.py", line 70, in err_too_many_indices
    raise IndexError(
IndexError: too many indices for array; expected 1, got 4

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3508, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_1951/4238254113.py", line 1, in <module>
    with beam.Pipeline() as p:
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/pipeline.py", line 600, in __exit__
    self.result = self.run()
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/pipeline.py", line 577, in run
    return self.runner.run_pipeline(self, self._options)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 128, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 455, in run_stages
    bundle_results = self._execute_bundle(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 783, in _execute_bundle
    self._run_bundle(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1020, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1356, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
    response = self.worker.do_instruction(request)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625, in do_instruction
    return getattr(self, request_type)(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 663, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1051, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 232, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 568, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 570, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 261, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1513, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1533, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 839, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 985, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/transforms/core.py", line -1, in <lambda>
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 90, in store_dataset_fragment
    _store_data(vname, da.variable, index, zgroup)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 50, in _store_data
    zarr_array[region] = data
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1497, in __setitem__
    self.set_basic_selection(pure_selection, value, fields=fields)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1593, in set_basic_selection
    return self._set_basic_selection_nd(selection, value, fields=fields)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1981, in _set_basic_selection_nd
    indexer = BasicIndexer(selection, self)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 335, in __init__
    selection = replace_ellipsis(selection, array._shape)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 272, in replace_ellipsis
    check_selection_length(selection, shape)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 236, in check_selection_length
    err_too_many_indices(selection, shape)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/errors.py", line 70, in err_too_many_indices
    raise IndexError(
IndexError: too many indices for array; expected 1, got 4 [while running '[23]: Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr/StoreToZarr/StoreDatasetFragments/Map(store_dataset_fragment)']

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 2105, in showtraceback
    stb = self.InteractiveTB.structured_traceback(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1428, in structured_traceback
    return FormattedTB.structured_traceback(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1319, in structured_traceback
    return VerboseTB.structured_traceback(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1172, in structured_traceback
    formatted_exception = self.format_exception_as_a_whole(etype, evalue, etb, number_of_lines_of_context,
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1087, in format_exception_as_a_whole
    frames.append(self.format_record(record))
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 969, in format_record
    frame_info.lines, Colors, self.has_colors, lvals
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 792, in lines
    return self._sd.lines
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/utils.py", line 144, in cached_property_wrapper
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/core.py", line 734, in lines
    pieces = self.included_pieces
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/utils.py", line 144, in cached_property_wrapper
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/core.py", line 681, in included_pieces
    pos = scope_pieces.index(self.executing_piece)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/utils.py", line 144, in cached_property_wrapper
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/core.py", line 660, in executing_piece
    return only(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/executing/executing.py", line 190, in only
    raise NotOneValueFound('Expected one value, found 0')
executing.executing.NotOneValueFound: Expected one value, found 0