pangeo-forge-recipes: #425 broke the example notebooks on the `beam-refactor` branch

I was just trying to run the CMIP6 example notebook with the latest commit of the beam-refactor branch.

Simplified code:

import os
from tempfile import TemporaryDirectory
import pandas as pd
import xarray as xr
import s3fs
from pangeo_forge_recipes.patterns import ConcatDim
from pangeo_forge_recipes.patterns import FilePattern
from pangeo_forge_recipes.transforms import Indexed, T
import apache_beam as beam
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr

input_urls = [
    's3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/tas/gr1/v20180701/tas_Amon_GFDL-CM4_historical_r1i1p1f1_gr1_185001-194912.nc',
    's3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/tas/gr1/v20180701/tas_Amon_GFDL-CM4_historical_r1i1p1f1_gr1_195001-201412.nc'
]

td = TemporaryDirectory()
target_root = td.name
store_name = "output.zarr"
target_store = os.path.join(target_root, store_name)
target_chunks = {'bnds': 2, 'lat': 180, 'lon': 288, 'time': 241}

def make_full_path(time):
    '''
    Parameters
    ----------
    time : str
    
        A 13-character string, comprised of two 6-character dates delimited by a dash. 
        The first four characters of each date are the year, and the final two are the month.
        
        e.g. The time range from Jan 1850 through Dec 1949 is expressed as '185001-194912'.
            
    '''
    base_url = 's3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/tas/gr1/v20180701/'
    return base_url + f'tas_Amon_GFDL-CM4_historical_r1i1p1f1_gr1_{time}.nc'

time_concat_dim = ConcatDim("time", keys=['185001-194912', '195001-201412'])
pattern = FilePattern(make_full_path, time_concat_dim, fsspec_open_kwargs={'anon':True})

transforms = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec(open_kwargs={'anon':True})
    | OpenWithXarray(file_type=pattern.file_type)
    | StoreToZarr(
        store_name=store_name,
        target_root=target_root,
        combine_dims=pattern.combine_dim_keys,
        target_chunks=target_chunks
    )
)

with beam.Pipeline() as p:
    p | transforms

I am running into an error:

Traceback (most recent call last):
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 42, in _store_data
    assert (region_slice.stop % chunksize == 0) or (region_slice.stop == dimsize)
AssertionError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 838, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 984, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/transforms/core.py", line -1, in <lambda>
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 87, in store_dataset_fragment
    _store_data(vname, da.variable, index, zgroup)
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 44, in _store_data
    raise ValueError(
ValueError: Region (slice(0, 288, None), slice(0, 241, None), slice(0, 180, None), slice(0, 2, None)) does not align with Zarr chunks (180, 2).

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3460, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/var/folders/_1/1k9jtjl51z333f21s7yht0340000gn/T/ipykernel_14155/3241349435.py", line 54, in <module>
    with beam.Pipeline() as p:
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/pipeline.py", line 600, in __exit__
    self.result = self.run()
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/pipeline.py", line 577, in run
    return self.runner.run_pipeline(self, self._options)
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 455, in run_stages
    bundle_results = self._execute_bundle(
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 783, in _execute_bundle
    self._run_bundle(
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1012, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1348, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
    response = self.worker.do_instruction(request)
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 624, in do_instruction
    return getattr(self, request_type)(
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 662, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1062, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 232, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1420, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1492, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1582, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1695, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1420, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1508, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 838, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 984, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/apache_beam/transforms/core.py", line -1, in <lambda>
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 87, in store_dataset_fragment
    _store_data(vname, da.variable, index, zgroup)
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 44, in _store_data
    raise ValueError(
ValueError: Region (slice(0, 288, None), slice(0, 241, None), slice(0, 180, None), slice(0, 2, None)) does not align with Zarr chunks (180, 2). [while running '[3]: Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr/StoreToZarr/StoreDatasetFragments/Map(store_dataset_fragment)']

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 2057, in showtraceback
    stb = self.InteractiveTB.structured_traceback(
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1288, in structured_traceback
    return FormattedTB.structured_traceback(
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1177, in structured_traceback
    return VerboseTB.structured_traceback(
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1030, in structured_traceback
    formatted_exception = self.format_exception_as_a_whole(etype, evalue, etb, number_of_lines_of_context,
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/IPython/core/ultratb.py", line 960, in format_exception_as_a_whole
    frames.append(self.format_record(record))
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/IPython/core/ultratb.py", line 870, in format_record
    frame_info.lines, Colors, self.has_colors, lvals
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/IPython/core/ultratb.py", line 704, in lines
    return self._sd.lines
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/stack_data/utils.py", line 144, in cached_property_wrapper
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/stack_data/core.py", line 734, in lines
    pieces = self.included_pieces
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/stack_data/utils.py", line 144, in cached_property_wrapper
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/stack_data/core.py", line 681, in included_pieces
    pos = scope_pieces.index(self.executing_piece)
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/stack_data/utils.py", line 144, in cached_property_wrapper
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/stack_data/core.py", line 660, in executing_piece
    return only(
  File "/Users/juliusbusecke/miniconda/envs/pangeo-forge-esgf/lib/python3.10/site-packages/executing/executing.py", line 190, in only
    raise NotOneValueFound('Expected one value, found 0')
executing.executing.NotOneValueFound: Expected one value, found 0

I confirmed that running this with 08364b5aad7762fde7773d1c2b6d0ff947abde62 does not produce the error, and the zarr store is successfully written.

I also tested the actual commit of #425 and this again reproduces the error. So I would conclude that something in #425 has broken this particular example.

Does anyone have some insights how to possibly fix this? I am currently refactoring my CMIP6 recipes to the beam-refactor and am blocked by this. I will for now work with the code in the commit before #425.

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Reactions: 2
  • Comments: 15 (15 by maintainers)

Most upvoted comments

@rabernat’s suggestion in https://github.com/pangeo-forge/pangeo-forge-recipes/issues/504#issuecomment-1485235403 solved this issue for me when I just encountered it in https://github.com/pangeo-forge/pangeo-forge-runner/pull/64#issuecomment-1490976679. Had I not been following this thread, however, it would have been quite difficult for me to infer from the error message that arose there

File ".../pangeo_forge_recipes/writers.py", line 87, in store_dataset_fragment
    _store_data(vname, da.variable, index, zgroup)
File ".../pangeo_forge_recipes/writers.py", line 44, in _store_data
    raise ValueError(\nValueError: Region (slice(1, 2, None), slice(None, None, None), slice(None, None, None)) does not align with Zarr chunks (180, 2).

that the solution could be as simple as {"decode_coords": "all"}.

I propose we consider adding a bit more detail to the error message in _store_data

https://github.com/pangeo-forge/pangeo-forge-recipes/blob/661d13b692492ff5b127f81a6d3d3dd3a3c63798/pangeo_forge_recipes/writers.py#L27-L47

At the very least, perhaps we should add

 raise ValueError( 
     f"Region {region} does not align with Zarr chunks {zarr_array.chunks} "
+    f"in zarr array '{vname}'. 
 )

which would give the user some idea of what array was causing the problem. Then, in the case of vname including the substring _bounds or similar, the possibility of the decode_coords solution might be more apparent?

Ok, I discovered a very simple workaround that makes this recipe work:

- target_chunks = {'bnds': 2, 'lat': 180, 'lon': 288, 'time': 241}
+ target_chunks = {'time': 241}

It turns out that explicitly specifying the target_chunks for the non-chunked dims triggers the error. It’s still a bug, but easily fixable I think.

I did a little digging and narrowed down the problem to this:

https://github.com/pangeo-forge/pangeo-forge-recipes/blob/661d13b692492ff5b127f81a6d3d3dd3a3c63798/pangeo_forge_recipes/rechunking.py#L211

In this line, tas gains an extra bnds dimension.

Oh interesting. Let me double check my dependencies. Would be nice if this is the solution. Then I can rebase my CMIP6 PGF recipes to the latest commit of this branch too.

It seems like lat_bnds should be a coordinate, not a data variable. Can you try opening the dataset with decode_coords="all", i.e.

OpenWithXarray(xarray_open_kwargs={"decode_coords": "all"})

Thanks for the reply @derekocallaghan.

I don’t know why the problem is occurring, but in writers._store_data(), the input lat_bnds variable used to generate region has four dimensions, while the corresponding Zarr array only has two dimensions, i.e. the same as the original CMIP6 dataset data variable. I’m assuming the Zarr array should also have four dimensions?

I think it is the other way around? I think two dimensions (lat, bnds) is the right dimensionality for this data variable in this particular dataset. I would conclude that the region generation seems to have a bug here?

I will try to dig a bit deeper into this later today.