dask: dd.read_parquet fails on directory path

What happened:

After upgrading to Dask 2.22.0, dd.read_parquet can no longer read some Parquet datasets (directories) that used to work with Dask 2.11.0.

What you expected to happen:

I expected the datasets to be loaded without changing the call to dd.read_parquet.

Minimal Complete Verifiable Example:

Example dataset: https://drive.google.com/file/d/1Qn5k9xZT_yegqisttGW1oGw-MwWHCY_E/view?usp=sharing

old
├── _common_metadata
├── _metadata
├── part.0.parquet
├── part.1.parquet
└── part.2.parquet

Trying to load this directory with dd.read_parquet fails:

>>> import dask.dataframe as dd                                                                                                                                               
>>> dd.read_parquet("old").compute()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/base.py", line 167, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/base.py", line 447, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/threaded.py", line 84, in get
    **kwargs
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/local.py", line 486, in get_async
    raise_exception(exc, tb)
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/local.py", line 316, in reraise
    raise exc
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/local.py", line 222, in execute_task
    result = _execute_task(task, data)
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in read_parquet_part
    dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in <listcomp>
    dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 753, in read_partition
    piece, columns, partitions, **kwargs
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 812, in _parquet_piece_as_arrow
    **kwargs.get("read", {}),
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/pyarrow/parquet.py", line 704, in read
    reader = self.open()
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/pyarrow/parquet.py", line 680, in open
    reader = self.open_file_func(self.path)
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/fsspec/spec.py", line 724, in open
    **kwargs
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/fsspec/implementations/local.py", line 107, in _open
    return LocalFileOpener(path, mode, fs=self, **kwargs)
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/fsspec/implementations/local.py", line 173, in __init__
    self._open()
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/fsspec/implementations/local.py", line 178, in _open
    self.f = open(self.path, mode=self.mode)
IsADirectoryError: [Errno 21] Is a directory: '/Users/matteo/Downloads/old'

Loading the files using a glob pattern and saving them to a new directory resolves the issue:

>>> dd.read_parquet("old/*.parquet").to_parquet("new")                                                                                                                        
>>> dd.read_parquet("new").compute()                                                                                                                                          
            date title description
index
0     2019-01-01   Foo         Foo
1     2019-01-02   Foo         Foo
2     2019-01-03   Foo         Foo
3     2019-01-04   Foo         Foo
...

Because I have many large datasets that produce the above error, I’d like to be able to load the original directories without having to load and save them.

Using a glob pattern is also not an option for me, because that does not load the metadata required for filtering:

>>> dd.read_parquet("old/*.parquet", filters=[("title", "==", "Foo")]).compute()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 239, in read_parquet
    **kwargs
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 651, in read_metadata
    paths, fs, split_row_groups, gather_statistics, filters, dataset_kwargs
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 161, in _gather_metadata
    dataset, base, fns = _get_dataset_object(paths, fs, filters, dataset_kwargs)
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 121, in _get_dataset_object
    paths, filesystem=fs, filters=filters, **dataset_kwargs
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/pyarrow/parquet.py", line 1197, in __init__
    self._filter(filters)
  File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/pyarrow/parquet.py", line 1311, in _filter
    accepts_filter = self.partitions.filter_accepts_partition
AttributeError: 'NoneType' object has no attribute 'filter_accepts_partition'

I’d like to be able to load directories with filters, like this:

>>> dd.read_parquet("new", filters=[("title", "==", "Foo")]).compute()
            date title description
index
0     2019-01-01   Foo         Foo
1     2019-01-02   Foo         Foo
2     2019-01-03   Foo         Foo
3     2019-01-04   Foo         Foo
...

Environment:

  • Dask version: 2.22.0
  • PyArrow version: 1.0.0
  • Python version: 3.7.4
  • Operating System: macOS 10.15.6
  • Install method (conda, pip, source): pip

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 1
  • Comments: 15 (15 by maintainers)

Most upvoted comments

Thanks! The following now works (with gather_statistics enabled):

>>> import dask
>>> dask.__version__
'2.23.0+7.gbdf0e2df'
>>> import dask.dataframe as dd
>>> dd.read_parquet('old/*.parquet', filters=[('title', '==', 'Foo')], gather_statistics=True).compute()

Reading the directory path doesn’t work, but if there are no downsides to using a glob pattern then this solution will do for me.