dask: Fastparquet reader with degraded performance in dask==2.2

Versions

dask==2.2 fastparquet==0.3.1 pyarrow==0.14.1

Code to reproduce

Create and save parquet files

import numpy as np
import pandas as pd
import dask.dataframe as dd
import random
import shutil
import fastparquet
import os

#generate dummy df
def generate_df(size, ncols):

    dates = pd.date_range('1980-01-01', periods=size, freq='1T')
    data = np.random.rand(size, ncols)
    
    df = pd.DataFrame(data=data, columns=[str(col) for col in range(ncols)], index=dates)
    return df
  

Generate dask dataframe

small_df = generate_df(1000000, ncols=20)

df1 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_fastparquet'):
    shutil.rmtree('test_fastparquet')
    
df1.to_parquet('test_fastparquet', compression='snappy', engine='fastparquet')

small_df = generate_df(1000000, ncols=20)

df2 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_pyarrow'):
    shutil.rmtree('test_pyarrow')
    
df2.to_parquet('test_pyarrow', compression='snappy', engine='pyarrow')

reading metadata and compute for 20 columns

%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=True)
19.2 s ± 1.91 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=False)
104 ms ± 4.59 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
dd.read_parquet('test_pyarrow/', engine='pyarrow', gather_statistics=True)
37.8 ms ± 1.02 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', gather_statistics=False)
9.74 ms ± 121 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

ddf1 = dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=True)
ddf2 = dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=False)
ddf3 = dd.read_parquet('test_pyarrow/', engine='pyarrow', gather_statistics=True)
ddf4 = dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', gather_statistics=False)

%%timeit
ddf1.compute()
9.19 s ± 264 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
ddf2.compute()
9.06 s ± 254 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
ddf3.compute()
1.05 s ± 37.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
ddf4.compute()
909 ms ± 52.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Generate dask dataframe

small_df = generate_df(1000000, ncols=2)

df1 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_fastparquet'):
    shutil.rmtree('test_fastparquet')
    
df1.to_parquet('test_fastparquet', compression='snappy', engine='fastparquet')

small_df = generate_df(1000000, ncols=2)

df2 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_pyarrow'):
    shutil.rmtree('test_pyarrow')
    
df2.to_parquet('test_pyarrow', compression='snappy', engine='pyarrow')

reading metadata and compute for 2 columns

%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=True)
476 ms ± 13 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=False)
23.3 ms ± 1.18 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
dd.read_parquet('test_pyarrow/', engine='pyarrow', gather_statistics=True)
22.7 ms ± 3.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', gather_statistics=False)
9.16 ms ± 2.18 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

ddf1 = dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=True)
ddf2 = dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=False)
ddf3 = dd.read_parquet('test_pyarrow/', engine='pyarrow', gather_statistics=True)
ddf4 = dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', gather_statistics=False)

%%timeit
ddf1.compute()
2.18 s ± 151 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
ddf2.compute()
2.02 s ± 59.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
ddf3.compute()
326 ms ± 54.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
ddf4.compute()
297 ms ± 15.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Now with dask=2.1.0

Versions

dask==2.1 fastparquet==0.3.1 pyarrow==0.14.1

Generate dask dataframe

small_df = generate_df(1000000, ncols=20)

df1 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_fastparquet'):
    shutil.rmtree('test_fastparquet')
    
df1.to_parquet('test_fastparquet', compression='snappy', engine='fastparquet')

small_df = generate_df(1000000, ncols=20)

df2 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_pyarrow'):
    shutil.rmtree('test_pyarrow')
    
df2.to_parquet('test_pyarrow', compression='snappy', engine='pyarrow')

reading metadata and compute for 20 columns

%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=True)
105 ms ± 455 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=False)
100 ms ± 1.68 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
dd.read_parquet('test_pyarrow/', engine='pyarrow', infer_divisions=True)
42 ms ± 8.58 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', infer_divisions=False)

%%timeit
dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', infer_divisions=False)
26.2 ms ± 353 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

ddf1 = dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=True)
ddf2 = dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=False)
ddf3 = dd.read_parquet('test_pyarrow/', engine='pyarrow', infer_divisions=True)
ddf4 = dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', infer_divisions=False)

%%timeit
ddf1.compute()
558 ms ± 4.85 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
ddf2.compute()
563 ms ± 12.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
ddf3.compute()
481 ms ± 3.88 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
ddf4.compute()
503 ms ± 23.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Generate dask dataframe

small_df = generate_df(1000000, ncols=20)

df1 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_fastparquet'):
    shutil.rmtree('test_fastparquet')
    
df1.to_parquet('test_fastparquet', compression='snappy', engine='fastparquet')

small_df = generate_df(1000000, ncols=20)

df2 = dd.from_pandas(small_df, npartitions=40)
if os.path.exists('test_pyarrow'):
    shutil.rmtree('test_pyarrow')
    
df2.to_parquet('test_pyarrow', compression='snappy', engine='pyarrow')

reading metadata and compute for 2 columns

%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=True)
20.6 ms ± 784 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=False)
19.5 ms ± 933 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
dd.read_parquet('test_pyarrow/', engine='pyarrow', infer_divisions=True)
24.3 ms ± 2 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', infer_divisions=False)
17.8 ms ± 577 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

ddf1 = dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=True)
ddf2 = dd.read_parquet('test_fastparquet/', engine='fastparquet', infer_divisions=False)
ddf3 = dd.read_parquet('test_pyarrow/', engine='pyarrow', infer_divisions=True)
ddf4 = dd.read_parquet('test_pyarrow/*.parquet', engine='pyarrow', infer_divisions=False)

%%timeit
ddf1.compute()
124 ms ± 11.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
ddf2.compute()
140 ms ± 19.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
ddf3.compute()
267 ms ± 24.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
ddf4.compute()
331 ms ± 42.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

This behaviour is obviously not desired and is most likely related with the last refactor done to parquet engine. I did not go further to isolate the problem since you guys can have a better clue of what is going on. I’m more than happy to help on this btw!

Note: Everything is running locally on my desktop.

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Reactions: 2
  • Comments: 19 (15 by maintainers)

Commits related to this issue

Most upvoted comments

With the following rather shonky code, I go from

In [4]: df = dd.read_parquet('test_fastparquet/', engine='fastparquet', gather_statistics=False)
In [5]: %timeit len(df)
489 ms ± 152 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

to

In [6]: %timeit len(df)
328 ms ± 97.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

on (local) distributed

--- a/dask/dataframe/io/parquet/fastparquet.py
+++ b/dask/dataframe/io/parquet/fastparquet.py
@@ -292,16 +292,22 @@ class FastParquetEngine(Engine):
         # This is a list of row-group-descriptor dicts, or file-paths
         # if we have a list of files and gather_statistics=False
         if not parts:
-            parts = pf.row_groups
+            partsin = pf.row_groups
+            pf.fmd.key_value_metadata = None
         else:
             pf = None
-        parts = [
-            {
+            partsin = parts
+        parts = []
+        for piece in partsin:
+            if pf is not None:
+                for col in piece.columns:
+                    col.meta_data.statistics = None
+                    col.meta_data.encoding_stats = None
+            part = {
                 "piece": piece,
                 "kwargs": {"pf": pf, "categories": categories_dict or categories},
             }
-            for piece in parts
-        ]
+            parts.append(part)

         return (meta, stats, parts)

@rjzamora , maybe you’d like to tidy that up a bit?

Releasing fastparquet with that fix. That will be the last of fastparquet for py2!