cudf: [BUG] Groupby collect list fails with Dask
cuDF recently implemented groupby collect list. I’d like to be able to use this with Dask, like I can on the CPU. Currently, it looks like Dask’s collect list goes a codepath that requires iterating through the object, which we explicitly don’t permit (https://github.com/rapidsai/cudf/issues/7481). We may want to explore this in Dask or special case in Dask cuDF.
import cudf
import dask_cudf
import pandas as pd
import dask.dataframe as dd
from io import StringIO
data = """a,b
1595802,1611:0.92
1595802,1610:0.07
1524246,1807:0.92
1524246,1608:0.07"""
df = pd.read_csv(StringIO(data))
ddf = dd.from_pandas(df, 2)
gdf = cudf.from_pandas(df)
gddf = dask_cudf.from_cudf(gdf, 2)
print(ddf.groupby("a").agg({"b":list}).compute()) # works as expected
print(gddf.groupby("a").agg({"b":list}).compute())
b
a
1595802 [1611:0.92, 1610:0.07]
1524246 [1807:0.92, 1608:0.07]
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
179 try:
--> 180 yield
181 except Exception as e:
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
5507 with raise_on_meta_error(funcname(func), udf=kwargs.pop("udf", False)):
-> 5508 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
5509
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in _groupby_apply_funcs(df, *index, **kwargs)
920 for result_column, func, func_kwargs in funcs:
--> 921 r = func(grouped, **func_kwargs)
922
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in _apply_func_to_column(df_like, column, func)
966
--> 967 return func(df_like[column])
968
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in <lambda>(s)
843 _apply_func_to_column,
--> 844 dict(column=input_column, func=lambda s: s.apply(list)),
845 )
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py in apply(self, function)
422 ]
--> 423 chunk_results = [function(chk) for chk in chunks]
424
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py in <listcomp>(.0)
422 ]
--> 423 chunk_results = [function(chk) for chk in chunks]
424
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/series.py in __iter__(self)
1189 def __iter__(self):
-> 1190 cudf.utils.utils.raise_iteration_error(obj=self)
1191
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/utils/utils.py in raise_iteration_error(obj)
358 raise TypeError(
--> 359 f"{obj.__class__.__name__} object is not iterable. "
360 f"Consider using `.to_arrow()`, `.to_pandas()` or `.values_host` "
TypeError: Series object is not iterable. Consider using `.to_arrow()`, `.to_pandas()` or `.values_host` if you wish to iterate over the values.
The above exception was the direct cause of the following exception:
ValueError Traceback (most recent call last)
<ipython-input-16-08ab17754440> in <module>
22
23 print(ddf.groupby("a").agg({"b":list}).compute()) # works as expected
---> 24 print(gddf.groupby("a").agg({"b":list}).compute())
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in agg(self, arg, split_every, split_out)
1846 @derived_from(pd.core.groupby.DataFrameGroupBy)
1847 def agg(self, arg, split_every=None, split_out=1):
-> 1848 return self.aggregate(arg, split_every=split_every, split_out=split_out)
1849
1850
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask_cudf/groupby.py in aggregate(self, arg, split_every, split_out)
81
82 return super().aggregate(
---> 83 arg, split_every=split_every, split_out=split_out
84 )
85
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
1842 return self.size()
1843
-> 1844 return super().aggregate(arg, split_every=split_every, split_out=split_out)
1845
1846 @derived_from(pd.core.groupby.DataFrameGroupBy)
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
1622 split_out=split_out,
1623 split_out_setup=split_out_on_index,
-> 1624 sort=self.sort,
1625 )
1626
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/core.py in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, sort, ignore_index, **kwargs)
5459
5460 if meta is no_default:
-> 5461 meta_chunk = _emulate(chunk, *args, udf=True, **chunk_kwargs)
5462 meta = _emulate(
5463 aggregate, _concat([meta_chunk], ignore_index), udf=True, **aggregate_kwargs
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
5506 """
5507 with raise_on_meta_error(funcname(func), udf=kwargs.pop("udf", False)):
-> 5508 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
5509
5510
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/contextlib.py in __exit__(self, type, value, traceback)
128 value = type()
129 try:
--> 130 self.gen.throw(type, value, traceback)
131 except StopIteration as exc:
132 # Suppress StopIteration *unless* it's the same exception that
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
199 )
200 msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 201 raise ValueError(msg) from e
202
203
ValueError: Metadata inference failed in `_groupby_apply_funcs`.
You have supplied a custom function and Dask is unable to
determine the type of output that that function returns.
To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.
Original error is below:
------------------------
TypeError('Series object is not iterable. Consider using `.to_arrow()`, `.to_pandas()` or `.values_host` if you wish to iterate over the values.')
Traceback:
---------
File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/utils.py", line 180, in raise_on_meta_error
yield
File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/core.py", line 5508, in _emulate
return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py", line 921, in _groupby_apply_funcs
r = func(grouped, **func_kwargs)
File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py", line 967, in _apply_func_to_column
return func(df_like[column])
File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py", line 844, in <lambda>
dict(column=input_column, func=lambda s: s.apply(list)),
File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py", line 423, in apply
chunk_results = [function(chk) for chk in chunks]
File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py", line 423, in <listcomp>
chunk_results = [function(chk) for chk in chunks]
File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/series.py", line 1190, in __iter__
cudf.utils.utils.raise_iteration_error(obj=self)
File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/utils/utils.py", line 359, in raise_iteration_error
f"{obj.__class__.__name__} object is not iterable. "
!conda list | grep "rapids\|dask\|numpy\|cupy\|arrow\|pandas"
# packages in environment at /raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331:
arrow-cpp 1.0.1 py37h2318771_14_cuda conda-forge
arrow-cpp-proc 3.0.0 cuda conda-forge
cudf 0.19.0a210331 cuda_10.2_py37_gc99fcef41b_313 rapidsai-nightly
cuml 0.19.0a210331 cuda10.2_py37_g83168076c_138 rapidsai-nightly
cupy 8.6.0 py37h7fc54ca_0 conda-forge
dask 2021.3.1 pyhd8ed1ab_0 conda-forge
dask-core 2021.3.1 pyhd8ed1ab_0 conda-forge
dask-cuda 0.19.0a210331 py37_45 rapidsai-nightly
dask-cudf 0.19.0a210331 py37_gc99fcef41b_313 rapidsai-nightly
libcudf 0.19.0a210331 cuda10.2_gbe2f0c000f_314 rapidsai-nightly
libcuml 0.19.0a210331 cuda10.2_g83168076c_138 rapidsai-nightly
libcumlprims 0.19.0a210316 cuda10.2_ge7e82a0_12 rapidsai-nightly
librmm 0.19.0a210331 cuda10.2_g9d1ba02_50 rapidsai-nightly
numpy 1.19.5 py37haa41c4c_1 conda-forge
pandas 1.2.3 py37hdc94413_0 conda-forge
pyarrow 1.0.1 py37hbeecfa9_14_cuda conda-forge
rmm 0.19.0a210331 cuda_10.2_py37_g9d1ba02_50 rapidsai-nightly
ucx 1.9.0+gcd9efd3 cuda10.2_0 rapidsai-nightly
ucx-proc 1.0.0 gpu rapidsai-nightly
ucx-py 0.19.0a210331 py37_gcd9efd3_46 rapidsai-nightly
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 27 (26 by maintainers)
Commits related to this issue
- Redirect callable aggregations to their named equivalent in dask-cuDF (#8048) Redirects Python built-in functions to their named equivalent aggregation in Dask-cuDF; this ensures that the Dask-cuDF c... — committed to rapidsai/cudf by charlesbluca 3 years ago
- Add collect list to dask-cudf groupby aggregations (#8045) Closes #7812 Adds support for cuDF's `collect` aggregation in dask-cuDF. Authors: - Charles Blackmon-Luca (https://github.com/charlesb... — committed to rapidsai/cudf by charlesbluca 3 years ago
In case any questions come up, this change was me. You can see the diff on #7818 in case that helps you find something you were used to looking for elsewhere, and feel free to ping me if something there is confusing.
Yeah - Good thought Nick. It would be great if there was a simple way to remove the outer-most offsets.
If there turns out to be no convenient way to “flatten” the list column in the way we need, we can still use
explode. However, since the row-count is not preserved, we would need to propagate seperate dataframes through the reduction tree for each of the specified list aggregations. I have confirmed in some toy code that this will work, but the logic will need to be pretty messy to handle a general use case. So, I’d much prefer a simple cudf utility/primitive 😃Here is the (ugly) toy example :)
Is there a reason you need to use _AggregationFactory.collect explicitly, rather than just including
listin the aggregation again? I know you will get a nested list, but it seems that _AggregationFactory.collect gives you this as well?Regarding list-column flattening: I think this is the key challenge for supporting list aggregation 😃. It may make sense to add an arg to _tree_node_agg and _finalize_gb_agg to specify that the input should be flattened (if a “*_list” column is present), and then you can add a single utility function to perform this flattening. (Note that I haven’t thought through this, so my understanding may be naive).
EDIT: It looks like
df['col_list'].explode()may flatten the column for you.Right - It is certainly true that
_groupby_partition_aggis expecting a dict with list values. This is by design, and requires the parentgroupby_aggfunction to ensure that the aggregation plan is rewritten to meet this specification. Regarding the motivating example for this issue: It looks like you just need to expand upon this str check to handle calllable objects as well. It also seems that we will need to add a check for general aggregations, likedf.groubpy("a").agg(list).I see - This definitely seems like somethin we should ultimately resolve in collect. However, since you are taking the dask_cudf route here, perhaps a simple fix is to fall back on
_meta_nonemptywhen things fail?Thanks @charlesbluca ! I agree that the priority is to support the pandas/cudf
list-aggregation syntax.As we discussed, it is somewhat of a coincidence that the
"list"label is supported by dask-dataframe (since it is not supported by pandas or cudf), but it shouldn’t hurt to support it in dask_cudf as well.I haven’t looked through this deeply, but the quickest/easiest “fix” may be to add
listto the optimized groupby-aggregation code path in dask_cudf. It may not require much work to get that code path working (would need to add support forlistin _tree_node_agg, but I’m not seeing any real blockers).