dask: groupby aggregation does not scale well with amount of groups
It seems that there is performance bug when doing grouping. Time and memory consumed by dask does not seems to scale well with number of output rows.
Please find below script to produce example data, replace N to produce bigger input data.
import pandas as pd
import numpy as np
def randChar(f, numGrp, N) :
things = [f%x for x in range(numGrp)]
return [things[x] for x in np.random.choice(numGrp, N)]
def randFloat(numGrp, N) :
things = [round(100*np.random.random(),4) for x in range(numGrp)]
return [things[x] for x in np.random.choice(numGrp, N)]
N = int(1e7)
K = 100
x = pd.DataFrame({
'id1' : randChar("id%03d", K, N), # large groups (char)
'id2' : randChar("id%03d", K, N), # large groups (char)
'id3' : randChar("id%010d", N//K, N), # small groups (char)
'id4' : np.random.choice(K, N), # large groups (int)
'id5' : np.random.choice(K, N), # large groups (int)
'id6' : np.random.choice(N//K, N), # small groups (int)
'v1' : np.random.choice(5, N), # int in range [1,5]
'v2' : np.random.choice(5, N), # int in range [1,5]
'v3' : randFloat(100,N) # numeric e.g. 23.5749
})
x.to_csv("example.csv", encoding='utf-8', index=False)
And following code to perform grouping.
import os
import gc
import timeit
import pandas as pd
import dask as dk
import dask.dataframe as dd
print(pd.__version__)
print(dk.__version__)
x = dd.read_csv("example.csv", na_filter=False).persist()
print(len(x))
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby(['id1']).agg({'v1':'sum'}).compute()
t = timeit.default_timer() - t_start
print(len(ans))
print(t)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby(['id3']).agg({'v1':'sum', 'v3':'mean'}).compute()
t = timeit.default_timer() - t_start
print(len(ans))
print(t)
del ans
Running on python 3.6.5, pandas 0.23.4, dask 0.19.2. Single machine 20 CPU, 125 GB memory.
For input 1e7 rows output 100 rows, timing: 0.4032 s output 1e5 rows, timing: 2.1272 s
For input 1e8 rows output 100 rows, timing: 3.2559 s output 1e6 rows, timing: 149.8847 s
Additionally I checked alternative approach, instead of .compute to use Client and .persist(adding print(len(.)) to ensure persist has kicked in). In both cases time was not acceptable (see table below, units are seconds).
| in_rows | groups | compute() | persist() |
|---|---|---|---|
| 1e+07 | fewer | 0.395 | 0.425 |
| 1e+07 | more | 2.124 | 2.145 |
| 1e+08 | fewer | 3.239 | 3.420 |
| 1e+08 | more | 148.211 | 148.043 |
| 1e+09 | fewer | NA | 848.911 |
| 1e+09 | more | NA | 5364.569 |
About this issue
- Original URL
- State: open
- Created 6 years ago
- Comments: 15 (9 by maintainers)
So that Dask doesn’t have to look at all of your data before coming up with a plan. One of the costs of laziness is poor planning.
On Tue, Feb 26, 2019 at 5:54 AM Jan Gorecki notifications@github.com wrote:
By default Dask combines all group by apply chunks a single Pandas dataframe output no matter the size resulting in. As shown in the Many groups example in the link. https://examples.dask.org/dataframes/02-groupby.html.
By using the parameter split_out you should be able to control this size.
It also automatically combines the resulting aggregation chucks (every 8 chunks by default). FYI: This is an arbitrary value set in dask/dataframe/core.py line 3595. This value is fine for most cases but if the aggregation chunks are large, it could take a substantial amount of time to combine them, (especially if you need to transfer chunk from another worker.
I think you can ensure output of the groupby chunks don’t get to big by setting split_out=4 and split_every =False.
df.groupby('id').x.mean(split_out=4, split_every=False)@martindurant please see the parquet error above. It looks like the current experience of writing from spark and reading from dask still isn’t smooth for novice users.
@jangorecki you might find this notebook helpful for your benchmarks: https://gist.github.com/c0b84b689238ea46cf9aa1c79155fe34