dask: regression in a custom groupby.agg when split_out is not None
What happened:
It seems that the changes in #8468 broke custom aggregation over in dask_geopandas, where we implement our own function to be able to aggregate geometries. When specifying split_out in the .agg method, it currently raises an error. With split_out=None, it is all fine, so I think that this is an issue on the dask side, not on dask_geopandas side.
What you expected to happen:
I expect that the aggregation will work no matter if I specify split_out or not.
Minimal Complete Verifiable Example:
This example requires geopandas and dask_geopandas, I wasn’t able to craft one with pure dask.
import geopandas
import dask_geopandas
world = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
ddf = dask_geopandas.from_geopandas(world, npartitions=4)
def union(block):
merged_geom = block.unary_union
return merged_geom
merge_geometries = dd.Aggregation(
"merge_geometries", lambda s: s.agg(union), lambda s0: s0.agg(union)
)
data_agg = {col: aggfunc for col in ddf.columns.drop([ddf.geometry.name])}
data_agg[ddf.geometry.name] = merge_geometries
# this works
# aggregated = ddf.groupby(by="continent").agg(data_agg)
# this doesn't
aggregated = ddf.groupby(by="continent").agg(data_agg, split_out=2)
Traceback:
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
KeyError: ‘geometry’
The above exception was the direct cause of the following exception:
KeyError Traceback (most recent call last) ~/Git/geopandas/geopandas/geodataframe.py in set_geometry(self, col, drop, inplace, crs) 310 try: –> 311 level = frame[col] 312 except KeyError:
~/Git/geopandas/geopandas/geodataframe.py in getitem(self, key) 1349 “”" -> 1350 result = super().getitem(key) 1351 geo_col = self._geometry_column_name
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/core/frame.py in getitem(self, key) 3457 return self._getitem_multilevel(key) -> 3458 indexer = self.columns.get_loc(key) 3459 if is_integer(indexer):
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance) 3362 except KeyError as err: -> 3363 raise KeyError(key) from err 3364
KeyError: ‘geometry’
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last) ~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf) 174 try: –> 175 yield 176 except Exception as e:
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _emulate(func, udf, args, **kwargs) 5965 with raise_on_meta_error(funcname(func), udf=udf): -> 5966 return func(_extract_meta(args, True), **_extract_meta(kwargs, True)) 5967
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _extract_meta(x, nonempty) 5945 elif isinstance(x, tuple): -> 5946 return tuple(_extract_meta(_x, nonempty) for _x in x) 5947 elif isinstance(x, dict):
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in <genexpr>(.0) 5945 elif isinstance(x, tuple): -> 5946 return tuple(_extract_meta(_x, nonempty) for _x in x) 5947 elif isinstance(x, dict):
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _extract_meta(x, nonempty) 5941 if isinstance(x, (Scalar, _Frame)): -> 5942 return x._meta_nonempty if nonempty else x._meta 5943 elif isinstance(x, list):
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _meta_nonempty(self)
430 “”“A non-empty version of _meta with fake data.”“”
–> 431 return meta_nonempty(self._meta)
432
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/utils.py in call(self, arg, *args, **kwargs) 623 meth = self.dispatch(type(arg)) –> 624 return meth(arg, *args, **kwargs) 625
~/Git/dask-geopandas/dask_geopandas/backends.py in _nonempty_geodataframe(x) 59 df = meta_nonempty_dataframe(x) —> 60 return geopandas.GeoDataFrame(df, geometry=x._geometry_column_name, crs=x.crs) 61
~/Git/geopandas/geopandas/geodataframe.py in init(self, data, geometry, crs, *args, **kwargs) 203 # TODO: raise error in 0.9 or 0.10. –> 204 self.set_geometry(geometry, inplace=True) 205
~/Git/geopandas/geopandas/geodataframe.py in set_geometry(self, col, drop, inplace, crs) 312 except KeyError: –> 313 raise ValueError(“Unknown column %s” % col) 314 except Exception:
ValueError: Unknown column geometry
The above exception was the direct cause of the following exception:
ValueError Traceback (most recent call last) /var/folders/2f/fhks6w_d0k556plcv3rfmshw0000gn/T/ipykernel_44916/2101873994.py in <module> 12 data_agg = {col: aggfunc for col in ddf.columns.drop([ddf.geometry.name])} 13 data_agg[ddf.geometry.name] = merge_geometries —> 14 aggregated = ddf.groupby(by=“continent”).agg(data_agg, split_out=2)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/groupby.py in agg(self, arg, split_every, split_out) 2006 @derived_from(pd.core.groupby.DataFrameGroupBy) 2007 def agg(self, arg, split_every=None, split_out=1): -> 2008 return self.aggregate(arg, split_every=split_every, split_out=split_out) 2009 2010
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out) 2002 return self.size() 2003 -> 2004 return super().aggregate(arg, split_every=split_every, split_out=split_out) 2005 2006 @derived_from(pd.core.groupby.DataFrameGroupBy)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out) 1619 ) 1620 -> 1621 return aca( 1622 chunk_args, 1623 chunk=_groupby_apply_funcs,
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, sort, ignore_index, **kwargs) 5878 # Blockwise Split Layer 5879 if split_out and split_out > 1: -> 5880 chunked = chunked.map_partitions( 5881 hash_shard, 5882 split_out,
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in map_partitions(self, func, *args, **kwargs) 769 None as the division. 770 “”" –> 771 return map_partitions(func, self, *args, **kwargs) 772 773 @insert_meta_param_description(pad=12)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in map_partitions(func, meta, enforce_metadata, transform_divisions, align_dataframes, *args, **kwargs) 6033 # Use non-normalized kwargs here, as we want the real values (not 6034 # delayed values) -> 6035 meta = _emulate(func, *args, udf=True, **kwargs) 6036 else: 6037 meta = make_meta(meta, index=meta_index, parent_meta=parent_meta)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _emulate(func, udf, args, **kwargs) 5964 “”" 5965 with raise_on_meta_error(funcname(func), udf=udf): -> 5966 return func(_extract_meta(args, True), **_extract_meta(kwargs, True)) 5967 5968
~/mambaforge/envs/geo_dev/lib/python3.9/contextlib.py in exit(self, typ, value, traceback) 135 value = typ() 136 try: –> 137 self.gen.throw(typ, value, traceback) 138 except StopIteration as exc: 139 # Suppress StopIteration unless it’s the same exception that
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
194 )
195 msg = msg.format(f" in {funcname}" if funcname else “”, repr(e), tb)
–> 196 raise ValueError(msg) from e
197
198
ValueError: Metadata inference failed in hash_shard.
You have supplied a custom function and Dask is unable to determine the type of output that that function returns.
To resolve this please provide a meta= keyword. The docstring of the Dask function you ran should have more information.
Original error is below:
ValueError(‘Unknown column geometry’)
Traceback:
File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/utils.py”, line 175, in raise_on_meta_error yield File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py”, line 5966, in _emulate return func(*_extract_meta(args, True), **_extract_meta(kwargs, True)) File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py”, line 5946, in _extract_meta return tuple(_extract_meta(_x, nonempty) for _x in x) File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py”, line 5946, in <genexpr> return tuple(_extract_meta(_x, nonempty) for _x in x) File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py”, line 5942, in _extract_meta return x._meta_nonempty if nonempty else x._meta File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py”, line 431, in _meta_nonempty return meta_nonempty(self._meta) File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/utils.py”, line 624, in call return meth(arg, *args, **kwargs) File “/Users/martin/Git/dask-geopandas/dask_geopandas/backends.py”, line 60, in _nonempty_geodataframe return geopandas.GeoDataFrame(df, geometry=x._geometry_column_name, crs=x.crs) File “/Users/martin/Git/geopandas/geopandas/geodataframe.py”, line 204, in init self.set_geometry(geometry, inplace=True) File “/Users/martin/Git/geopandas/geopandas/geodataframe.py”, line 313, in set_geometry raise ValueError(“Unknown column %s” % col)
</details>
**Anything else we need to know?**:
**Environment**:
- Dask version: '2022.01.0+17.g08eee6d6'
- Python version: 3.9.7
- Operating System: macOS, ubuntu
- Install method (conda, pip, source): pip + git
<!-- If you are reporting an issue such as scale stability, cluster deadlock.
Please provide a cluster dump state with this issue, by running client.dump_cluster_state()
https://distributed.dask.org/en/stable/api.html?highlight=dump_cluster_state#distributed.Client.dump_cluster_state
-->
<details>
<summary>Cluster Dump State:</summary>
</details>
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 15 (14 by maintainers)
It’s probably not a regression in dask, but an existing issue with the interaction with geopandas.GeoDataFrame being a special sublcass.
What changed in #8468, is that
apply_concat_applystarts to usemap_partitions. And withsplit_outspecified, it callsmap_partitionsa second time, which I think is what triggers the direct error. But I think it’s actually already caused by the first call ofmap_partitions. In our case of thedissolvemethod, this is basically a wrapper aroundgroupby. And then, the function being mapped is_groupby_apply_funcs, which has this part in the code:https://github.com/dask/dask/blob/08eee6d655459e9b8494d67957b6fe908e1b8c67/dask/dataframe/groupby.py#L929-L930
where
resultis a dictionary of columns, but with renamed column names.A very simple example that reproduces the underlying issue (without involving groupby / apply_concat_apply):
So we get a dask_geopandas GeoDataFrame where the
_metais an “invalid” GeoDataFrame, because the geometry column name is not correctly set.Since dask cannot know that it should handle GeoDataFrame construction in a special way inside
_groupby_apply_funcs, this is probably something to discuss on the geopandas side on how we can deal with this.