dask: regression in a custom groupby.agg when split_out is not None

What happened:

It seems that the changes in #8468 broke custom aggregation over in dask_geopandas, where we implement our own function to be able to aggregate geometries. When specifying split_out in the .agg method, it currently raises an error. With split_out=None, it is all fine, so I think that this is an issue on the dask side, not on dask_geopandas side.

What you expected to happen:

I expect that the aggregation will work no matter if I specify split_out or not.

Minimal Complete Verifiable Example:

This example requires geopandas and dask_geopandas, I wasn’t able to craft one with pure dask.

import geopandas
import dask_geopandas

world = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
ddf = dask_geopandas.from_geopandas(world, npartitions=4)


def union(block):
    merged_geom = block.unary_union
    return merged_geom

merge_geometries = dd.Aggregation(
    "merge_geometries", lambda s: s.agg(union), lambda s0: s0.agg(union)
)
data_agg = {col: aggfunc for col in ddf.columns.drop([ddf.geometry.name])}
data_agg[ddf.geometry.name] = merge_geometries

# this works
# aggregated = ddf.groupby(by="continent").agg(data_agg)

# this doesn't
aggregated = ddf.groupby(by="continent").agg(data_agg, split_out=2)

Traceback:

```py --------------------------------------------------------------------------- KeyError Traceback (most recent call last) ~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance) 3360 try: -> 3361 return self._engine.get_loc(casted_key) 3362 except KeyError as err:

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()

pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()

pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()

KeyError: ‘geometry’

The above exception was the direct cause of the following exception:

KeyError Traceback (most recent call last) ~/Git/geopandas/geopandas/geodataframe.py in set_geometry(self, col, drop, inplace, crs) 310 try: –> 311 level = frame[col] 312 except KeyError:

~/Git/geopandas/geopandas/geodataframe.py in getitem(self, key) 1349 “”" -> 1350 result = super().getitem(key) 1351 geo_col = self._geometry_column_name

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/core/frame.py in getitem(self, key) 3457 return self._getitem_multilevel(key) -> 3458 indexer = self.columns.get_loc(key) 3459 if is_integer(indexer):

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance) 3362 except KeyError as err: -> 3363 raise KeyError(key) from err 3364

KeyError: ‘geometry’

During handling of the above exception, another exception occurred:

ValueError Traceback (most recent call last) ~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf) 174 try: –> 175 yield 176 except Exception as e:

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _emulate(func, udf, args, **kwargs) 5965 with raise_on_meta_error(funcname(func), udf=udf): -> 5966 return func(_extract_meta(args, True), **_extract_meta(kwargs, True)) 5967

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _extract_meta(x, nonempty) 5945 elif isinstance(x, tuple): -> 5946 return tuple(_extract_meta(_x, nonempty) for _x in x) 5947 elif isinstance(x, dict):

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in <genexpr>(.0) 5945 elif isinstance(x, tuple): -> 5946 return tuple(_extract_meta(_x, nonempty) for _x in x) 5947 elif isinstance(x, dict):

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _extract_meta(x, nonempty) 5941 if isinstance(x, (Scalar, _Frame)): -> 5942 return x._meta_nonempty if nonempty else x._meta 5943 elif isinstance(x, list):

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _meta_nonempty(self) 430 “”“A non-empty version of _meta with fake data.”“” –> 431 return meta_nonempty(self._meta) 432

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/utils.py in call(self, arg, *args, **kwargs) 623 meth = self.dispatch(type(arg)) –> 624 return meth(arg, *args, **kwargs) 625

~/Git/dask-geopandas/dask_geopandas/backends.py in _nonempty_geodataframe(x) 59 df = meta_nonempty_dataframe(x) —> 60 return geopandas.GeoDataFrame(df, geometry=x._geometry_column_name, crs=x.crs) 61

~/Git/geopandas/geopandas/geodataframe.py in init(self, data, geometry, crs, *args, **kwargs) 203 # TODO: raise error in 0.9 or 0.10. –> 204 self.set_geometry(geometry, inplace=True) 205

~/Git/geopandas/geopandas/geodataframe.py in set_geometry(self, col, drop, inplace, crs) 312 except KeyError: –> 313 raise ValueError(“Unknown column %s” % col) 314 except Exception:

ValueError: Unknown column geometry

The above exception was the direct cause of the following exception:

ValueError Traceback (most recent call last) /var/folders/2f/fhks6w_d0k556plcv3rfmshw0000gn/T/ipykernel_44916/2101873994.py in <module> 12 data_agg = {col: aggfunc for col in ddf.columns.drop([ddf.geometry.name])} 13 data_agg[ddf.geometry.name] = merge_geometries —> 14 aggregated = ddf.groupby(by=“continent”).agg(data_agg, split_out=2)

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/groupby.py in agg(self, arg, split_every, split_out) 2006 @derived_from(pd.core.groupby.DataFrameGroupBy) 2007 def agg(self, arg, split_every=None, split_out=1): -> 2008 return self.aggregate(arg, split_every=split_every, split_out=split_out) 2009 2010

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out) 2002 return self.size() 2003 -> 2004 return super().aggregate(arg, split_every=split_every, split_out=split_out) 2005 2006 @derived_from(pd.core.groupby.DataFrameGroupBy)

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out) 1619 ) 1620 -> 1621 return aca( 1622 chunk_args, 1623 chunk=_groupby_apply_funcs,

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, sort, ignore_index, **kwargs) 5878 # Blockwise Split Layer 5879 if split_out and split_out > 1: -> 5880 chunked = chunked.map_partitions( 5881 hash_shard, 5882 split_out,

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in map_partitions(self, func, *args, **kwargs) 769 None as the division. 770 “”" –> 771 return map_partitions(func, self, *args, **kwargs) 772 773 @insert_meta_param_description(pad=12)

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in map_partitions(func, meta, enforce_metadata, transform_divisions, align_dataframes, *args, **kwargs) 6033 # Use non-normalized kwargs here, as we want the real values (not 6034 # delayed values) -> 6035 meta = _emulate(func, *args, udf=True, **kwargs) 6036 else: 6037 meta = make_meta(meta, index=meta_index, parent_meta=parent_meta)

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _emulate(func, udf, args, **kwargs) 5964 “”" 5965 with raise_on_meta_error(funcname(func), udf=udf): -> 5966 return func(_extract_meta(args, True), **_extract_meta(kwargs, True)) 5967 5968

~/mambaforge/envs/geo_dev/lib/python3.9/contextlib.py in exit(self, typ, value, traceback) 135 value = typ() 136 try: –> 137 self.gen.throw(typ, value, traceback) 138 except StopIteration as exc: 139 # Suppress StopIteration unless it’s the same exception that

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf) 194 ) 195 msg = msg.format(f" in {funcname}" if funcname else “”, repr(e), tb) –> 196 raise ValueError(msg) from e 197 198

ValueError: Metadata inference failed in hash_shard.

You have supplied a custom function and Dask is unable to determine the type of output that that function returns.

To resolve this please provide a meta= keyword. The docstring of the Dask function you ran should have more information.

Original error is below:

ValueError(‘Unknown column geometry’)

Traceback:

File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/utils.py”, line 175, in raise_on_meta_error yield File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py”, line 5966, in _emulate return func(*_extract_meta(args, True), **_extract_meta(kwargs, True)) File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py”, line 5946, in _extract_meta return tuple(_extract_meta(_x, nonempty) for _x in x) File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py”, line 5946, in <genexpr> return tuple(_extract_meta(_x, nonempty) for _x in x) File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py”, line 5942, in _extract_meta return x._meta_nonempty if nonempty else x._meta File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py”, line 431, in _meta_nonempty return meta_nonempty(self._meta) File “/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/utils.py”, line 624, in call return meth(arg, *args, **kwargs) File “/Users/martin/Git/dask-geopandas/dask_geopandas/backends.py”, line 60, in _nonempty_geodataframe return geopandas.GeoDataFrame(df, geometry=x._geometry_column_name, crs=x.crs) File “/Users/martin/Git/geopandas/geopandas/geodataframe.py”, line 204, in init self.set_geometry(geometry, inplace=True) File “/Users/martin/Git/geopandas/geopandas/geodataframe.py”, line 313, in set_geometry raise ValueError(“Unknown column %s” % col)

</details>

**Anything else we need to know?**:

**Environment**:

- Dask version: '2022.01.0+17.g08eee6d6'
- Python version: 3.9.7
- Operating System: macOS, ubuntu
- Install method (conda, pip, source): pip + git

<!-- If you are reporting an issue such as scale stability, cluster deadlock.
Please provide a cluster dump state with this issue, by running client.dump_cluster_state()

https://distributed.dask.org/en/stable/api.html?highlight=dump_cluster_state#distributed.Client.dump_cluster_state

-->

<details>
<summary>Cluster Dump State:</summary>

</details>

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 15 (14 by maintainers)

Most upvoted comments

It’s probably not a regression in dask, but an existing issue with the interaction with geopandas.GeoDataFrame being a special sublcass.

What changed in #8468, is that apply_concat_apply starts to use map_partitions. And with split_out specified, it calls map_partitions a second time, which I think is what triggers the direct error. But I think it’s actually already caused by the first call of map_partitions. In our case of the dissolve method, this is basically a wrapper around groupby. And then, the function being mapped is _groupby_apply_funcs, which has this part in the code:

https://github.com/dask/dask/blob/08eee6d655459e9b8494d67957b6fe908e1b8c67/dask/dataframe/groupby.py#L929-L930

where result is a dictionary of columns, but with renamed column names.

A very simple example that reproduces the underlying issue (without involving groupby / apply_concat_apply):

import geopandas
import dask_geopandas

df = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
ddf = dask_geopandas.from_geopandas(df, npartitions=4)

def func(df):
    result = {}
    for col in df.columns:
        result["prefix-" + col] = df[col]
    # naively recreating the class, without accounting for the renamed geometry column
    return df.__class__(result)


>>> res = ddf.map_partitions(func)
>>> res
Dask GeoDataFrame Structure:
              prefix-pop_est prefix-continent prefix-name prefix-iso_a3 prefix-gdp_md_est prefix-geometry
npartitions=4                                                                                            
0                      int64           object      object        object           float64        geometry
45                       ...              ...         ...           ...               ...             ...
90                       ...              ...         ...           ...               ...             ...
135                      ...              ...         ...           ...               ...             ...
176                      ...              ...         ...           ...               ...             ...
Dask Name: func, 8 tasks

>>> res._meta
Empty GeoDataFrame
Columns: [prefix-pop_est, prefix-continent, prefix-name, prefix-iso_a3, prefix-gdp_md_est, prefix-geometry]
Index: []

>>> res._meta._geometry_column_name
'geometry'

So we get a dask_geopandas GeoDataFrame where the _meta is an “invalid” GeoDataFrame, because the geometry column name is not correctly set.

Since dask cannot know that it should handle GeoDataFrame construction in a special way inside _groupby_apply_funcs, this is probably something to discuss on the geopandas side on how we can deal with this.