dask: Groupby NUnique is slow and possibly buggy
Overview
I ran into some issues where groupby nunique was bottle-necking my computations. In chasing things, I found that by re-implementing and I was able to get over 100x improvement in speed in lots of test cases, but I’m unsure of the extra requirements to merge this / generalize my solution to work for all dask groupby nunique. (not sure current status / support on multi-index groupbys)
The core of the implementation is to use groupby on a multi index (including the values), and .sum() and .count() rather than .apply(M.drop_duplicates).
Also note: .apply(M.drop_duplicates) seems to have a bug as well: https://github.com/pandas-dev/pandas/issues/26515
Slow Example
from dask.datasets import timeseries
df = timeseries()
super_slow = df['y'].groupby(df['x']).nunique()
result = super_slow.compute()
Note: I’ve never actually let this finish, it seems (on my computer) to take over 10 minutes and still hasn’t completed.
Faster, bespoke implementation
Code Here
ACA methods for nunique
import pandas as pd
from dask.dataframe.core import aca
def split_out_on_column(df, col_name='Unknown'):
return df[col_name]
def unique_count_chunk(target_series, groupby_series, rename='__count'):
grouped = target_series.groupby([groupby_series, target_series]).count().rename(rename)
return grouped.reset_index()
def unique_count_combine(df, groupby_name='groupby_col', target_name='target_col', rename='__count'):
grouped = df.groupby([groupby_name, target_name])[rename].sum().rename(rename)
return grouped.reset_index()
def nunique_aggregate(df, groupby_name='groupby_col', target_name='target_col', rename='__count'):
grouped = df.groupby([groupby_name, target_name])[rename].sum().rename(rename)
return grouped.groupby(groupby_name).count().astype('Int64').rename(rename)
def series_groupby_nunique(series_groupby, rename='__count', split_out=None, split_every=None):
if isinstance(series_groupby.index, list):
raise NotImplementedError("Multi-column groupby not supported in series-nunique")
groupby_name = series_groupby.index.name
target_name = series_groupby.obj.name
meta = pd.Series([], dtype='Int64', name=rename,
index=pd.Series([], dtype=series_groupby.obj.dtype, name=groupby_name))
return aca([series_groupby.obj, series_groupby.index],
token='series-groupby-nunique',
chunk=unique_count_chunk,
chunk_kwargs={'rename': rename},
aggregate=nunique_aggregate,
aggregate_kwargs={'groupby_name': groupby_name, 'target_name': target_name, 'rename': rename},
combine=unique_count_combine,
combine_kwargs={'groupby_name': groupby_name, 'target_name': target_name, 'rename': rename},
meta=meta,
split_every=split_every, split_out=split_out,
split_out_setup=split_out_on_column, split_out_setup_kwargs={'col_name': groupby_name})
Some aspects that make the above bespoke:
- Not allowing multiple column groupbys, this is just a series groupby series.
- Reliance on pandas to make the meta (not sure best practice here)
- Using
Int64as my dtype since I prefer ints that can be nulled. Not sure of the implications of this / what dask standards are? - Used
reset_indexon each level, sinceconcatonMultiIndexseems to cause a memory leak? Was blowing out RAM on largesplit_out=100, but by resetting index and just passing around dataframes instead of indexed series to be combined, everything behaved well in terms of RAM. - Not tested against categorical columns which have some different groupby behaviors.
from dask.datasets import timeseries
df = timeseries()
much_faster = series_groupby_nunique(df['y'].groupby(df['x']))
result = much_faster.compute()
Question:
What was the reasoning behind using .apply(drop_duplicates) as part of the nunique groupby agg?
Does the faster implementation above seem like a reasonable alternative, or am I missing an edge case / have a clear failure in my implementation? Should I work towards putting this in as a PR?
About this issue
- Original URL
- State: open
- Created 5 years ago
- Comments: 17 (12 by maintainers)
I’ve been spending quite some time going around and trying to figure out why this gets so slow. Still didn’t figure out why is so slow, but one way around is:
like mentioned in: https://github.com/rapidsai/cudf/issues/4663#issuecomment-602970891
SeriesGroupBy.uniqueis also slow, maybe when we have million of groups there is a big for loop hanging things? Don’t know, just wondering.Any ideas how to proceed? Maybe people in this thread https://github.com/rapidsai/cudf/issues/4663#issuecomment-602980225 are more fit to tackle this than me. If someone gives me instructions on where to poke, I’d be happy to help.