dask: dd.read_parquet fails on directory path
What happened:
After upgrading to Dask 2.22.0, dd.read_parquet
can no longer read some Parquet datasets (directories) that used to work with Dask 2.11.0.
What you expected to happen:
I expected the datasets to be loaded without changing the call to dd.read_parquet
.
Minimal Complete Verifiable Example:
Example dataset: https://drive.google.com/file/d/1Qn5k9xZT_yegqisttGW1oGw-MwWHCY_E/view?usp=sharing
old
├── _common_metadata
├── _metadata
├── part.0.parquet
├── part.1.parquet
└── part.2.parquet
Trying to load this directory with dd.read_parquet
fails:
>>> import dask.dataframe as dd
>>> dd.read_parquet("old").compute()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/base.py", line 167, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/base.py", line 447, in compute
results = schedule(dsk, keys, **kwargs)
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/threaded.py", line 84, in get
**kwargs
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/local.py", line 486, in get_async
raise_exception(exc, tb)
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/local.py", line 316, in reraise
raise exc
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/local.py", line 222, in execute_task
result = _execute_task(task, data)
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in read_parquet_part
dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in <listcomp>
dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 753, in read_partition
piece, columns, partitions, **kwargs
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 812, in _parquet_piece_as_arrow
**kwargs.get("read", {}),
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/pyarrow/parquet.py", line 704, in read
reader = self.open()
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/pyarrow/parquet.py", line 680, in open
reader = self.open_file_func(self.path)
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/fsspec/spec.py", line 724, in open
**kwargs
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/fsspec/implementations/local.py", line 107, in _open
return LocalFileOpener(path, mode, fs=self, **kwargs)
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/fsspec/implementations/local.py", line 173, in __init__
self._open()
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/fsspec/implementations/local.py", line 178, in _open
self.f = open(self.path, mode=self.mode)
IsADirectoryError: [Errno 21] Is a directory: '/Users/matteo/Downloads/old'
Loading the files using a glob pattern and saving them to a new directory resolves the issue:
>>> dd.read_parquet("old/*.parquet").to_parquet("new")
>>> dd.read_parquet("new").compute()
date title description
index
0 2019-01-01 Foo Foo
1 2019-01-02 Foo Foo
2 2019-01-03 Foo Foo
3 2019-01-04 Foo Foo
...
Because I have many large datasets that produce the above error, I’d like to be able to load the original directories without having to load and save them.
Using a glob pattern is also not an option for me, because that does not load the metadata required for filtering:
>>> dd.read_parquet("old/*.parquet", filters=[("title", "==", "Foo")]).compute()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 239, in read_parquet
**kwargs
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 651, in read_metadata
paths, fs, split_row_groups, gather_statistics, filters, dataset_kwargs
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 161, in _gather_metadata
dataset, base, fns = _get_dataset_object(paths, fs, filters, dataset_kwargs)
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 121, in _get_dataset_object
paths, filesystem=fs, filters=filters, **dataset_kwargs
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/pyarrow/parquet.py", line 1197, in __init__
self._filter(filters)
File "/Users/matteo/.pyenv/versions/3.7.4/lib/python3.7/site-packages/pyarrow/parquet.py", line 1311, in _filter
accepts_filter = self.partitions.filter_accepts_partition
AttributeError: 'NoneType' object has no attribute 'filter_accepts_partition'
I’d like to be able to load directories with filters, like this:
>>> dd.read_parquet("new", filters=[("title", "==", "Foo")]).compute()
date title description
index
0 2019-01-01 Foo Foo
1 2019-01-02 Foo Foo
2 2019-01-03 Foo Foo
3 2019-01-04 Foo Foo
...
Environment:
- Dask version: 2.22.0
- PyArrow version: 1.0.0
- Python version: 3.7.4
- Operating System: macOS 10.15.6
- Install method (conda, pip, source): pip
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Reactions: 1
- Comments: 15 (15 by maintainers)
Thanks! The following now works (with gather_statistics enabled):
Reading the directory path doesn’t work, but if there are no downsides to using a glob pattern then this solution will do for me.