dask: Fastparquet reader with degraded performance in dask==2.2
Versions
dask==2.2 fastparquet==0.3.1 pyarrow==0.14.1
Code to reproduce
Create and save parquet files
import numpy as np
import pandas as pd
import dask.dataframe as dd
import random
import shutil
import fastparquet
import os
#generate dummy df
def generate_df(size, ncols):
dates = pd.date_range('1980-01-01', periods=size, freq='1T')
data = np.random.rand(size, ncols)
df = pd.DataFrame(data=data, columns=[str(col) for col in range(ncols)], index=dates)
return df
Generate dask dataframe
small_df = generate_df(1000000, ncols=20)
df1 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_fastparquet'):
shutil.rmtree('test_fastparquet')
df1.to_parquet('test_fastparquet', compression='snappy', engine='fastparquet')
small_df = generate_df(1000000, ncols=20)
df2 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_pyarrow'):
shutil.rmtree('test_pyarrow')
df2.to_parquet('test_pyarrow', compression='snappy', engine='pyarrow')
reading metadata and compute for 20 columns
%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=True)
19.2 s ± 1.91 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=False)
104 ms ± 4.59 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
dd.read_parquet('test_pyarrow/', engine='pyarrow', gather_statistics=True)
37.8 ms ± 1.02 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', gather_statistics=False)
9.74 ms ± 121 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
ddf1 = dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=True)
ddf2 = dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=False)
ddf3 = dd.read_parquet('test_pyarrow/', engine='pyarrow', gather_statistics=True)
ddf4 = dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', gather_statistics=False)
%%timeit
ddf1.compute()
9.19 s ± 264 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
ddf2.compute()
9.06 s ± 254 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
ddf3.compute()
1.05 s ± 37.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
ddf4.compute()
909 ms ± 52.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Generate dask dataframe
small_df = generate_df(1000000, ncols=2)
df1 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_fastparquet'):
shutil.rmtree('test_fastparquet')
df1.to_parquet('test_fastparquet', compression='snappy', engine='fastparquet')
small_df = generate_df(1000000, ncols=2)
df2 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_pyarrow'):
shutil.rmtree('test_pyarrow')
df2.to_parquet('test_pyarrow', compression='snappy', engine='pyarrow')
reading metadata and compute for 2 columns
%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=True)
476 ms ± 13 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=False)
23.3 ms ± 1.18 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
dd.read_parquet('test_pyarrow/', engine='pyarrow', gather_statistics=True)
22.7 ms ± 3.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', gather_statistics=False)
9.16 ms ± 2.18 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
ddf1 = dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=True)
ddf2 = dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=False)
ddf3 = dd.read_parquet('test_pyarrow/', engine='pyarrow', gather_statistics=True)
ddf4 = dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', gather_statistics=False)
%%timeit
ddf1.compute()
2.18 s ± 151 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
ddf2.compute()
2.02 s ± 59.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
ddf3.compute()
326 ms ± 54.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
ddf4.compute()
297 ms ± 15.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Now with dask=2.1.0
Versions
dask==2.1 fastparquet==0.3.1 pyarrow==0.14.1
Generate dask dataframe
small_df = generate_df(1000000, ncols=20)
df1 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_fastparquet'):
shutil.rmtree('test_fastparquet')
df1.to_parquet('test_fastparquet', compression='snappy', engine='fastparquet')
small_df = generate_df(1000000, ncols=20)
df2 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_pyarrow'):
shutil.rmtree('test_pyarrow')
df2.to_parquet('test_pyarrow', compression='snappy', engine='pyarrow')
reading metadata and compute for 20 columns
%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=True)
105 ms ± 455 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=False)
100 ms ± 1.68 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
dd.read_parquet('test_pyarrow/', engine='pyarrow', infer_divisions=True)
42 ms ± 8.58 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', infer_divisions=False)
%%timeit
dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', infer_divisions=False)
26.2 ms ± 353 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
ddf1 = dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=True)
ddf2 = dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=False)
ddf3 = dd.read_parquet('test_pyarrow/', engine='pyarrow', infer_divisions=True)
ddf4 = dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', infer_divisions=False)
%%timeit
ddf1.compute()
558 ms ± 4.85 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
ddf2.compute()
563 ms ± 12.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
ddf3.compute()
481 ms ± 3.88 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
ddf4.compute()
503 ms ± 23.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Generate dask dataframe
small_df = generate_df(1000000, ncols=20)
df1 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_fastparquet'):
shutil.rmtree('test_fastparquet')
df1.to_parquet('test_fastparquet', compression='snappy', engine='fastparquet')
small_df = generate_df(1000000, ncols=20)
df2 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_pyarrow'):
shutil.rmtree('test_pyarrow')
df2.to_parquet('test_pyarrow', compression='snappy', engine='pyarrow')
reading metadata and compute for 2 columns
%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=True)
20.6 ms ± 784 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=False)
19.5 ms ± 933 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
dd.read_parquet('test_pyarrow/', engine='pyarrow', infer_divisions=True)
24.3 ms ± 2 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', infer_divisions=False)
17.8 ms ± 577 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
ddf1 = dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=True)
ddf2 = dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=False)
ddf3 = dd.read_parquet('test_pyarrow/', engine='pyarrow', infer_divisions=True)
ddf4 = dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', infer_divisions=False)
%%timeit
ddf1.compute()
124 ms ± 11.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
ddf2.compute()
140 ms ± 19.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
ddf3.compute()
267 ms ± 24.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
ddf4.compute()
331 ms ± 42.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
This behaviour is obviously not desired and is most likely related with the last refactor done to parquet engine. I did not go further to isolate the problem since you guys can have a better clue of what is going on. I’m more than happy to help on this btw!
Note: Everything is running locally on my desktop.
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Reactions: 2
- Comments: 19 (15 by maintainers)
Commits related to this issue
- Only calculate dataset stats once Ref: https://github.com/dask/dask/issues/5200 — committed to martindurant/fastparquet by deleted user 5 years ago
- Only calculate dataset stats once (#453) Ref: https://github.com/dask/dask/issues/5200 — committed to dask/fastparquet by martindurant 5 years ago
With the following rather shonky code, I go from
to
on (local) distributed
@rjzamora , maybe you’d like to tidy that up a bit?
Releasing fastparquet with that fix. That will be the last of fastparquet for py2!