cudf: [BUG] Groupby collect list fails with Dask

cuDF recently implemented groupby collect list. I’d like to be able to use this with Dask, like I can on the CPU. Currently, it looks like Dask’s collect list goes a codepath that requires iterating through the object, which we explicitly don’t permit (https://github.com/rapidsai/cudf/issues/7481). We may want to explore this in Dask or special case in Dask cuDF.

import cudf
import dask_cudf
import pandas as pd
import dask.dataframe as dd
​
from io import StringIO

​
data = """a,b
1595802,1611:0.92
1595802,1610:0.07
1524246,1807:0.92
1524246,1608:0.07"""
​
df = pd.read_csv(StringIO(data))
ddf = dd.from_pandas(df, 2)
​
gdf = cudf.from_pandas(df)
gddf = dask_cudf.from_cudf(gdf, 2)
​
print(ddf.groupby("a").agg({"b":list}).compute()) # works as expected
print(gddf.groupby("a").agg({"b":list}).compute())
                              b
a                              
1595802  [1611:0.92, 1610:0.07]
1524246  [1807:0.92, 1608:0.07]
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    179     try:
--> 180         yield
    181     except Exception as e:

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   5507     with raise_on_meta_error(funcname(func), udf=kwargs.pop("udf", False)):
-> 5508         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   5509 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in _groupby_apply_funcs(df, *index, **kwargs)
    920     for result_column, func, func_kwargs in funcs:
--> 921         r = func(grouped, **func_kwargs)
    922 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in _apply_func_to_column(df_like, column, func)
    966 
--> 967     return func(df_like[column])
    968 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in <lambda>(s)
    843                 _apply_func_to_column,
--> 844                 dict(column=input_column, func=lambda s: s.apply(list)),
    845             )

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py in apply(self, function)
    422         ]
--> 423         chunk_results = [function(chk) for chk in chunks]
    424 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py in <listcomp>(.0)
    422         ]
--> 423         chunk_results = [function(chk) for chk in chunks]
    424 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/series.py in __iter__(self)
   1189     def __iter__(self):
-> 1190         cudf.utils.utils.raise_iteration_error(obj=self)
   1191 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/utils/utils.py in raise_iteration_error(obj)
    358     raise TypeError(
--> 359         f"{obj.__class__.__name__} object is not iterable. "
    360         f"Consider using `.to_arrow()`, `.to_pandas()` or `.values_host` "

TypeError: Series object is not iterable. Consider using `.to_arrow()`, `.to_pandas()` or `.values_host` if you wish to iterate over the values.

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
<ipython-input-16-08ab17754440> in <module>
     22 
     23 print(ddf.groupby("a").agg({"b":list}).compute()) # works as expected
---> 24 print(gddf.groupby("a").agg({"b":list}).compute())

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in agg(self, arg, split_every, split_out)
   1846     @derived_from(pd.core.groupby.DataFrameGroupBy)
   1847     def agg(self, arg, split_every=None, split_out=1):
-> 1848         return self.aggregate(arg, split_every=split_every, split_out=split_out)
   1849 
   1850 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask_cudf/groupby.py in aggregate(self, arg, split_every, split_out)
     81 
     82         return super().aggregate(
---> 83             arg, split_every=split_every, split_out=split_out
     84         )
     85 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
   1842             return self.size()
   1843 
-> 1844         return super().aggregate(arg, split_every=split_every, split_out=split_out)
   1845 
   1846     @derived_from(pd.core.groupby.DataFrameGroupBy)

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
   1622             split_out=split_out,
   1623             split_out_setup=split_out_on_index,
-> 1624             sort=self.sort,
   1625         )
   1626 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/core.py in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, sort, ignore_index, **kwargs)
   5459 
   5460     if meta is no_default:
-> 5461         meta_chunk = _emulate(chunk, *args, udf=True, **chunk_kwargs)
   5462         meta = _emulate(
   5463             aggregate, _concat([meta_chunk], ignore_index), udf=True, **aggregate_kwargs

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   5506     """
   5507     with raise_on_meta_error(funcname(func), udf=kwargs.pop("udf", False)):
-> 5508         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   5509 
   5510 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/contextlib.py in __exit__(self, type, value, traceback)
    128                 value = type()
    129             try:
--> 130                 self.gen.throw(type, value, traceback)
    131             except StopIteration as exc:
    132                 # Suppress StopIteration *unless* it's the same exception that

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    199         )
    200         msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 201         raise ValueError(msg) from e
    202 
    203 

ValueError: Metadata inference failed in `_groupby_apply_funcs`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
TypeError('Series object is not iterable. Consider using `.to_arrow()`, `.to_pandas()` or `.values_host` if you wish to iterate over the values.')

Traceback:
---------
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/utils.py", line 180, in raise_on_meta_error
    yield
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/core.py", line 5508, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py", line 921, in _groupby_apply_funcs
    r = func(grouped, **func_kwargs)
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py", line 967, in _apply_func_to_column
    return func(df_like[column])
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py", line 844, in <lambda>
    dict(column=input_column, func=lambda s: s.apply(list)),
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py", line 423, in apply
    chunk_results = [function(chk) for chk in chunks]
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py", line 423, in <listcomp>
    chunk_results = [function(chk) for chk in chunks]
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/series.py", line 1190, in __iter__
    cudf.utils.utils.raise_iteration_error(obj=self)
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/utils/utils.py", line 359, in raise_iteration_error
    f"{obj.__class__.__name__} object is not iterable. "
!conda list | grep "rapids\|dask\|numpy\|cupy\|arrow\|pandas"
# packages in environment at /raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331:
arrow-cpp                 1.0.1           py37h2318771_14_cuda    conda-forge
arrow-cpp-proc            3.0.0                      cuda    conda-forge
cudf                      0.19.0a210331   cuda_10.2_py37_gc99fcef41b_313    rapidsai-nightly
cuml                      0.19.0a210331   cuda10.2_py37_g83168076c_138    rapidsai-nightly
cupy                      8.6.0            py37h7fc54ca_0    conda-forge
dask                      2021.3.1           pyhd8ed1ab_0    conda-forge
dask-core                 2021.3.1           pyhd8ed1ab_0    conda-forge
dask-cuda                 0.19.0a210331           py37_45    rapidsai-nightly
dask-cudf                 0.19.0a210331   py37_gc99fcef41b_313    rapidsai-nightly
libcudf                   0.19.0a210331   cuda10.2_gbe2f0c000f_314    rapidsai-nightly
libcuml                   0.19.0a210331   cuda10.2_g83168076c_138    rapidsai-nightly
libcumlprims              0.19.0a210316   cuda10.2_ge7e82a0_12    rapidsai-nightly
librmm                    0.19.0a210331   cuda10.2_g9d1ba02_50    rapidsai-nightly
numpy                     1.19.5           py37haa41c4c_1    conda-forge
pandas                    1.2.3            py37hdc94413_0    conda-forge
pyarrow                   1.0.1           py37hbeecfa9_14_cuda    conda-forge
rmm                       0.19.0a210331   cuda_10.2_py37_g9d1ba02_50    rapidsai-nightly
ucx                       1.9.0+gcd9efd3       cuda10.2_0    rapidsai-nightly
ucx-proc                  1.0.0                       gpu    rapidsai-nightly
ucx-py                    0.19.0a210331   py37_gcd9efd3_46    rapidsai-nightly

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 27 (26 by maintainers)

Commits related to this issue

Most upvoted comments

I actually am using list in the aggregation again, which redirects to Aggregation.collect() (looks like the internals have been updated recently so _AggregationFactory is now Aggregation)

https://github.com/rapidsai/cudf/blob/025c56ab1c4046f0f3d1d3ec768b1a69755643c8/python/cudf/cudf/_lib/aggregation.pyx#L284-L286

In case any questions come up, this change was me. You can see the diff on #7818 in case that helps you find something you were used to looking for elsewhere, and feel free to ping me if something there is confusing.

Yeah - Good thought Nick. It would be great if there was a simple way to remove the outer-most offsets.

If there turns out to be no convenient way to “flatten” the list column in the way we need, we can still use explode. However, since the row-count is not preserved, we would need to propagate seperate dataframes through the reduction tree for each of the specified list aggregations. I have confirmed in some toy code that this will work, but the logic will need to be pretty messy to handle a general use case. So, I’d much prefer a simple cudf utility/primitive 😃

Here is the (ugly) toy example :)
import cudf
from io import StringIO

data = """a,b,c
1595802,1611:0.92,0
1595802,1610:0.07,1
1524246,1807:0.92,0
1524246,1608:0.07,1"""

# Read and split (proxy for dask_cudf)
gdf = cudf.read_csv(StringIO(data))
gdf1 = gdf[:3]
gdf2 = gdf[3:]

# Groupby on each partition
gb1_0 = gdf1.groupby("a").agg({"b":list, "c": [list, "sum"]}).reset_index()
gb2_0 = gdf2.groupby("a").agg({"b":list, "c": [list, "sum"]}).reset_index()

# Flatten Column Names
gb1_0.columns = [f"{col}_{agg}" if agg else col for col, agg in gb1_0.columns]
gb2_0.columns = [f"{col}_{agg}" if agg else col for col, agg in gb2_0.columns]

# Explode the output of each partition for list columns
gb1_0_b = gb1_0[["a", "b_list"]].explode("b_list")
gb1_0_c = gb1_0[["a", "c_list"]].explode("c_list")
gb1_0.drop(columns=["b_list", "c_list"], inplace=True)

gb2_0_b = gb2_0[["a", "b_list"]].explode("b_list")
gb2_0_c = gb2_0[["a", "c_list"]].explode("c_list")
gb2_0.drop(columns=["b_list", "c_list"], inplace=True)

# Concatenate partition-wise results and perform another list agg
result = cudf.concat([gb1_0, gb2_0], ignore_index=True).reset_index(drop=True)
result = result.groupby("a").agg({"c_sum": ["sum"]})
result.columns = ["c_sum"]

gdf_b = cudf.concat([gb1_0_b, gb2_0_b], ignore_index=True).reset_index(drop=True)
gdf_c = cudf.concat([gb1_0_c, gb2_0_c], ignore_index=True).reset_index(drop=True)
gdf_bc = cudf.concat([gdf_b, gdf_c["c_list"]], axis=1)
result_bc = gdf_bc.groupby("a").agg({"b_list":list, "c_list": list})

# "Final" Result
cudf.concat([result, result_bc[["b_list", "c_list"]]], axis=1)

Currently have a naive approach to the _tree_node_agg, which uses _AggregationFactory.collect to combine the partitions.

Is there a reason you need to use _AggregationFactory.collect explicitly, rather than just including list in the aggregation again? I know you will get a nested list, but it seems that _AggregationFactory.collect gives you this as well?

Regarding list-column flattening: I think this is the key challenge for supporting list aggregation 😃. It may make sense to add an arg to _tree_node_agg and _finalize_gb_agg to specify that the input should be flattened (if a “*_list” column is present), and then you can add a single utility function to perform this flattening. (Note that I haven’t thought through this, so my understanding may be naive).

EDIT: It looks like df['col_list'].explode() may flatten the column for you.

This conflicts with @beckernick’s example, which running through dask-cudf’s codepath would raise a TypeError because list is not iterable. Not sure if the best solution for this would be to explicitly specify examples of input for the aggs dict, or a check in the code to make sure all aggregations are in lists.

Right - It is certainly true that _groupby_partition_agg is expecting a dict with list values. This is by design, and requires the parent groupby_agg function to ensure that the aggregation plan is rewritten to meet this specification. Regarding the motivating example for this issue: It looks like you just need to expand upon this str check to handle calllable objects as well. It also seems that we will need to add a check for general aggregations, like df.groubpy("a").agg(list).

I see - This definitely seems like somethin we should ultimately resolve in collect. However, since you are taking the dask_cudf route here, perhaps a simple fix is to fall back on _meta_nonempty when things fail?

try:
    _meta = ddf._meta.groupby(gb_cols, as_index=as_index).agg(_aggs)
except NotImplementedError:
    _meta = ddf._meta_nonempty.groupby(gb_cols, as_index=as_index).agg(_aggs)

This falls out of the scope of this issue, so for now I’ll make a PR that adds support for the callable list, and later on a separate PR that expands _is_supported() to handle callables, and optionally emulate Dask.dataframe’s “list” behavior.

Thanks @charlesbluca ! I agree that the priority is to support the pandas/cudf list-aggregation syntax.

As we discussed, it is somewhat of a coincidence that the "list" label is supported by dask-dataframe (since it is not supported by pandas or cudf), but it shouldn’t hurt to support it in dask_cudf as well.

I haven’t looked through this deeply, but the quickest/easiest “fix” may be to add list to the optimized groupby-aggregation code path in dask_cudf. It may not require much work to get that code path working (would need to add support for list in _tree_node_agg, but I’m not seeing any real blockers).