intake-esm: Problems combining members with different runtime
I have had some trouble reading the ‘IPSL’ model with intake esm locally ( on the Princeton Server tigressdata). When I am attempting to read all available o2 data for the ssp585 scenario.
# First find all models with *any* o2 data on tigress
url = "/tigress/GEOCLIM/LRGROUP/jbusecke/code/intake-esm-datastore/catalogs/tigressdata-cmip6.json"
col = intake.open_esm_datastore(url)
cat = col.search(source_id='IPSL-CM6A-LR', variable_id='o2', experiment_id='ssp585')#, member_id='r1i1p1f1'
ddict = cat.to_dataset_dict(cdf_kwargs={'chunks': {'time':6},'decode_times': False,})
This throws an error:
variable_id grid_label dcpp_init_year version
kwargs: {}
Exception: ValueError(“cannot reindex or align along dimension ‘time’ because the index has duplicate values”)
ValueError Traceback (most recent call last) <ipython-input-59-2073bdeb6e88> in <module> 4 col = intake.open_esm_datastore(url) 5 cat = col.search(source_id=‘IPSL-CM6A-LR’, variable_id=‘o2’, experiment_id=‘ssp585’)#, member_id=‘r1i1p1f1’ ----> 6 ddict = cat.to_dataset_dict(cdf_kwargs={‘chunks’: {‘time’:6},‘decode_times’: False,})# YUP this fails, when choosing all member_ids
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/intake_esm/core.py in to_dataset_dict(self, zarr_kwargs, cdf_kwargs, preprocess, aggregate, storage_options, progressbar) 378 self.progressbar = progressbar 379 –> 380 return self._open_dataset() 381 382 def _open_dataset(self):
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/intake_esm/core.py in _open_dataset(self) 472 ) 473 –> 474 dsets = client.gather(futures) 475 self._ds = {group_id: ds for (group_id, ds) in dsets} 476
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous) 1886 else: 1887 local_worker = None -> 1888 return self.sync( 1889 self._gather, 1890 futures,
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 775 return future 776 else: –> 777 return sync( 778 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 779 )
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs) 346 if error[0]: 347 typ, exc, tb = error[0] –> 348 raise exc.with_traceback(tb) 349 else: 350 return result[0]
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/distributed/utils.py in f() 330 if callback_timeout is not None: 331 future = asyncio.wait_for(future, callback_timeout) –> 332 result[0] = yield future 333 except Exception as exc: 334 error[0] = sys.exc_info()
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/tornado/gen.py in run(self) 733 734 try: –> 735 value = future.result() 736 except Exception: 737 exc_info = sys.exc_info()
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker) 1751 exc = CancelledError(key) 1752 else: -> 1753 raise exception.with_traceback(traceback) 1754 raise exc 1755 if errors == “skip”:
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/intake_esm/core.py in _load_group_dataset(key, df, col_data, agg_columns, aggregation_dict, path_column_name, variable_column_name, use_format_column, mapper_dict, zarr_kwargs, cdf_kwargs, preprocess) 549 ) 550 –> 551 ds = _aggregate( 552 aggregation_dict, 553 agg_columns,
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/intake_esm/merge_util.py in _aggregate(aggregation_dict, agg_columns, n_agg, v, lookup, mapper_dict, zarr_kwargs, cdf_kwargs, preprocess) 174 return ds 175 –> 176 return apply_aggregation(v) 177 178
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/intake_esm/merge_util.py in apply_aggregation(v, agg_column, key, level) 119 agg_options = {} 120 –> 121 dsets = [ 122 apply_aggregation(value, agg_column, key=key, level=level + 1) 123 for key, value in v.items()
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/intake_esm/merge_util.py in <listcomp>(.0) 120 121 dsets = [ –> 122 apply_aggregation(value, agg_column, key=key, level=level + 1) 123 for key, value in v.items() 124 ]
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/intake_esm/merge_util.py in apply_aggregation(v, agg_column, key, level) 147 ) 148 varname = dsets[0].attrs[‘intake_esm_varname’] –> 149 ds = join_new( 150 dsets, 151 dim_name=agg_column,
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/intake_esm/merge_util.py in join_new(dsets, dim_name, coord_value, varname, options) 23 except Exception as e: 24 logger.error(f’Failed to join datasets along new dimension.') —> 25 raise e 26 27
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/intake_esm/merge_util.py in join_new(dsets, dim_name, coord_value, varname, options) 20 try: 21 concat_dim = xr.DataArray(coord_value, dims=(dim_name), name=dim_name) —> 22 return xr.concat(dsets, dim=concat_dim, data_vars=varname, **options) 23 except Exception as e: 24 logger.error(f’Failed to join datasets along new dimension.')
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/xarray/core/concat.py in concat(objs, dim, data_vars, coords, compat, positions, fill_value, join) 133 “objects, got %s” % type(first_obj) 134 ) –> 135 return f(objs, dim, data_vars, coords, compat, positions, fill_value, join) 136 137
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/xarray/core/concat.py in _dataset_concat(datasets, dim, data_vars, coords, compat, positions, fill_value, join) 316 # Make sure we’re working on a copy (we’ll be loading variables) 317 datasets = [ds.copy() for ds in datasets] –> 318 datasets = align( 319 *datasets, join=join, copy=False, exclude=[dim], fill_value=fill_value 320 )
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/xarray/core/alignment.py in align(join, copy, indexes, exclude, fill_value, *objects) 335 new_obj = obj.copy(deep=copy) 336 else: –> 337 new_obj = obj.reindex(copy=copy, fill_value=fill_value, **valid_indexers) 338 new_obj.encoding = obj.encoding 339 result.append(new_obj)
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/xarray/core/dataset.py in reindex(self, indexers, method, tolerance, copy, fill_value, **indexers_kwargs) 2488 2489 “”" -> 2490 return self._reindex( 2491 indexers, 2492 method,
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/xarray/core/dataset.py in _reindex(self, indexers, method, tolerance, copy, fill_value, sparse, **indexers_kwargs) 2517 raise ValueError(“invalid reindex dimensions: %s” % bad_dims) 2518 -> 2519 variables, indexes = alignment.reindex_variables( 2520 self.variables, 2521 self.sizes,
/tigress/jbusecke/code/conda/envs/euc_dynamics/lib/python3.8/site-packages/xarray/core/alignment.py in reindex_variables(variables, sizes, indexes, indexers, method, tolerance, copy, fill_value, sparse) 545 546 if not index.is_unique: –> 547 raise ValueError( 548 "cannot reindex or align along dimension %r because the " 549 “index has duplicate values” % dim
ValueError: cannot reindex or align along dimension ‘time’ because the index has duplicate values
I think the problem is that one of the members (r1i1p1f1) has a longer runtime than the others.
If you only load that member it works:
cat = col.search(source_id='IPSL-CM6A-LR', variable_id='o2', experiment_id='ssp585',member_id='r1i1p1f1')
ddict = cat.to_dataset_dict(cdf_kwargs={'chunks': {'time':6},'decode_times': True,})
ddict
{'ScenarioMIP.IPSL.IPSL-CM6A-LR.ssp585.Omon.gn': <xarray.Dataset>
Dimensions: (axis_nbounds: 2, member_id: 1, nvertex: 4, olevel: 75, time: 3432, x: 362, y: 332)
Coordinates:
nav_lat (y, x) float32 dask.array<chunksize=(332, 362), meta=np.ndarray>
nav_lon (y, x) float32 dask.array<chunksize=(332, 362), meta=np.ndarray>
* time (time) object 2015-01-16T12:00:00 ... 2300-12-16 12:00:00
* olevel (olevel) float32 0.50576 1.5558553 ... 5698.0605 5902.0576
* member_id (member_id) <U8 'r1i1p1f1'
Dimensions without coordinates: axis_nbounds, nvertex, x, y
Data variables:
bounds_nav_lon (time, y, x, nvertex) float32 dask.array<chunksize=(1032, 332, 362, 4), meta=np.ndarray>
bounds_nav_lat (time, y, x, nvertex) float32 dask.array<chunksize=(1032, 332, 362, 4), meta=np.ndarray>
time_bounds (time, axis_nbounds) object dask.array<chunksize=(6, 2), meta=np.ndarray>
olevel_bounds (time, olevel, axis_nbounds) float32 dask.array<chunksize=(1032, 75, 2), meta=np.ndarray>
area (time, y, x) float32 dask.array<chunksize=(1032, 332, 362), meta=np.ndarray>
o2 (member_id, time, olevel, y, x) float32 dask.array<chunksize=(1, 6, 75, 332, 362), meta=np.ndarray>
But you can also see that it has 3000+ timesteps (running till 2300).
For comparison, other members only have ~1000 timesteps (standard time until 2100):
cat = col.search(source_id='IPSL-CM6A-LR', variable_id='o2', experiment_id='ssp585',member_id='r2i1p1f1')#,
ddict = cat.to_dataset_dict(cdf_kwargs={'chunks': {'time':6},'decode_times': True,})# YUP this fails, when choosing all member_ids
ddict
{'ScenarioMIP.IPSL.IPSL-CM6A-LR.ssp585.Omon.gn': <xarray.Dataset>
Dimensions: (axis_nbounds: 2, member_id: 1, nvertex: 4, olevel: 75, time: 1032, x: 362, y: 332)
Coordinates:
nav_lat (y, x) float32 dask.array<chunksize=(332, 362), meta=np.ndarray>
nav_lon (y, x) float32 dask.array<chunksize=(332, 362), meta=np.ndarray>
* time (time) datetime64[ns] 2015-01-16T12:00:00 ... 2100-12-16T12:00:00
* olevel (olevel) float32 0.50576 1.5558553 ... 5698.0605 5902.0576
* member_id (member_id) <U8 'r2i1p1f1'
Dimensions without coordinates: axis_nbounds, nvertex, x, y
Data variables:
bounds_nav_lon (time, y, x, nvertex) float32 dask.array<chunksize=(1032, 332, 362, 4), meta=np.ndarray>
bounds_nav_lat (time, y, x, nvertex) float32 dask.array<chunksize=(1032, 332, 362, 4), meta=np.ndarray>
time_bounds (time, axis_nbounds) datetime64[ns] dask.array<chunksize=(6, 2), meta=np.ndarray>
olevel_bounds (time, olevel, axis_nbounds) float32 dask.array<chunksize=(1032, 75, 2), meta=np.ndarray>
area (time, y, x) float32 dask.array<chunksize=(1032, 332, 362), meta=np.ndarray>
o2 (member_id, time, olevel, y, x) float32 dask.array<chunksize=(1, 6, 75, 332, 362), meta=np.ndarray>
It seems that intake-esm struggles with merging those datasets. In the cloud storage the extended member was removed, so the initial call works (cc @naomi-henderson). But I would be really interested in using these extended data.
Is there a way to make this work? I thought it might be possible to alter the attrs of the data sets with preprocessing, but had no luck in getting this to work. I think in principle these are mislabelled and should be a separate experiment ssp585-extended or similar.
Any suggestions for solving this?
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 46 (32 by maintainers)
Dask will fix this: https://github.com/dask/dask/pull/6514
It looks for which version number is higher.
Not yet. I am looking into the csv you just sent me to make sure that
--pick-latest-versionflag is working as expected.@jbusecke It looks like your catalog includes both versions of this file ‘/tiger/scratch/gpfs/GEOCLIM/synda/data/CMIP6/ScenarioMIP/IPSL/IPSL-CM6A-LR/ssp585/r1i1p1f1/Omon/o2/gn/v20190119/o2_Omon_IPSL-CM6A-LR_ssp585_r1i1p1f1_gn_201501-210012.nc’, ‘/tiger/scratch/gpfs/GEOCLIM/synda/data/CMIP6/ScenarioMIP/IPSL/IPSL-CM6A-LR/ssp585/r1i1p1f1/Omon/o2/gn/v20190903/o2_Omon_IPSL-CM6A-LR_ssp585_r1i1p1f1_gn_201501-210012.nc’, It looks like that’s where the duplication in time is happening. You might want to make sure it’s not using version v20190119 because the other files in that time series are using v20190903.