dask: groupby aggregation does not scale well with amount of groups

It seems that there is performance bug when doing grouping. Time and memory consumed by dask does not seems to scale well with number of output rows. Please find below script to produce example data, replace N to produce bigger input data.

import pandas as pd
import numpy as np

def randChar(f, numGrp, N) :
   things = [f%x for x in range(numGrp)]
   return [things[x] for x in np.random.choice(numGrp, N)]

def randFloat(numGrp, N) :
   things = [round(100*np.random.random(),4) for x in range(numGrp)]
   return [things[x] for x in np.random.choice(numGrp, N)]

N = int(1e7)
K = 100
x = pd.DataFrame({
  'id1' : randChar("id%03d", K, N),       # large groups (char)
  'id2' : randChar("id%03d", K, N),       # large groups (char)
  'id3' : randChar("id%010d", N//K, N),   # small groups (char)
  'id4' : np.random.choice(K, N),         # large groups (int)
  'id5' : np.random.choice(K, N),         # large groups (int)
  'id6' : np.random.choice(N//K, N),      # small groups (int)
  'v1' :  np.random.choice(5, N),         # int in range [1,5]
  'v2' :  np.random.choice(5, N),         # int in range [1,5]
  'v3' :  randFloat(100,N)                # numeric e.g. 23.5749
})
x.to_csv("example.csv", encoding='utf-8', index=False)

And following code to perform grouping.

import os
import gc
import timeit
import pandas as pd
import dask as dk
import dask.dataframe as dd

print(pd.__version__)
print(dk.__version__)

x = dd.read_csv("example.csv", na_filter=False).persist()
print(len(x))

gc.collect()
t_start = timeit.default_timer()
ans = x.groupby(['id1']).agg({'v1':'sum'}).compute()
t = timeit.default_timer() - t_start
print(len(ans))
print(t)
del ans

gc.collect()
t_start = timeit.default_timer()
ans = x.groupby(['id3']).agg({'v1':'sum', 'v3':'mean'}).compute()
t = timeit.default_timer() - t_start
print(len(ans))
print(t)
del ans

Running on python 3.6.5, pandas 0.23.4, dask 0.19.2. Single machine 20 CPU, 125 GB memory.

For input 1e7 rows output 100 rows, timing: 0.4032 s output 1e5 rows, timing: 2.1272 s

For input 1e8 rows output 100 rows, timing: 3.2559 s output 1e6 rows, timing: 149.8847 s

Additionally I checked alternative approach, instead of .compute to use Client and .persist(adding print(len(.)) to ensure persist has kicked in). In both cases time was not acceptable (see table below, units are seconds).

in_rows groups compute() persist()
1e+07 fewer 0.395 0.425
1e+07 more 2.124 2.145
1e+08 fewer 3.239 3.420
1e+08 more 148.211 148.043
1e+09 fewer NA 848.911
1e+09 more NA 5364.569

About this issue

  • Original URL
  • State: open
  • Created 6 years ago
  • Comments: 15 (9 by maintainers)

Most upvoted comments

So that Dask doesn’t have to look at all of your data before coming up with a plan. One of the costs of laziness is poor planning.

On Tue, Feb 26, 2019 at 5:54 AM Jan Gorecki notifications@github.com wrote:

@alex959595 https://github.com/alex959595 thanks for suggestion. Any idea why this is not optimised internally? So the aggregation API can be data agnostic, relying only on metadata (schema).

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/4001#issuecomment-467446577, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszLFj5NE4bxfM4CtuaOpD3jo1thRYks5vRTyZgaJpZM4WzgYh .

By default Dask combines all group by apply chunks a single Pandas dataframe output no matter the size resulting in. As shown in the Many groups example in the link. https://examples.dask.org/dataframes/02-groupby.html.

By using the parameter split_out you should be able to control this size.

It also automatically combines the resulting aggregation chucks (every 8 chunks by default). FYI: This is an arbitrary value set in dask/dataframe/core.py line 3595. This value is fine for most cases but if the aggregation chunks are large, it could take a substantial amount of time to combine them, (especially if you need to transfer chunk from another worker.

I think you can ensure output of the groupby chunks don’t get to big by setting split_out=4 and split_every =False.

df.groupby('id').x.mean(split_out=4, split_every=False)

@martindurant please see the parquet error above. It looks like the current experience of writing from spark and reading from dask still isn’t smooth for novice users.

@jangorecki you might find this notebook helpful for your benchmarks: https://gist.github.com/c0b84b689238ea46cf9aa1c79155fe34