dask: Globs don't work when reading multiple hive-format parquet files

Hi,

We have a directory full of hive-format parquet files (generated by fastparquet, but I don’t think it’s relevant here), one for each month, which in totality form a year.

I’d like to load these files all into one dataframe, with the glob functionality:

df  = ddf.read_parquet("../data/processed/2016*.parq")

While this works for CSVs and such, it doesn’t seem to work for parquet files - I think the machinery that does the glob matching doesn’t really know how to deal with the fact that hive-format parquet files are already directories.

Loading the files individually works OK:


In [12]: df  = ddf.read_parquet("../data/processed/201601.parq")

The error we get:


In [13]: df  = ddf.read_parquet("../data/processed/2016*.parq")
---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep)
     46             self.fn = fn2
---> 47             with open_with(fn2, 'rb') as f:
     48                 self._parse_header(f, verify)

/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/dask/bytes/core.py in __enter__(self)
    313         mode = self.mode.replace('t', '').replace('b', '') + 'b'
--> 314         f = f2 = self.myopen(self.path, mode=mode)
    315         CompressFile = merge(seekable_files, compress_files)[self.compression]

/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/dask/bytes/local.py in open(self, path, mode, **kwargs)
     62             path = path[len('file://'):]
---> 63         return open(path, mode=mode)
     64 

FileNotFoundError: [Errno 2] No such file or directory: '../data/processed/2016*.parq/_metadata/_metadata'

During handling of the above exception, another exception occurred:

FileNotFoundError                         Traceback (most recent call last)
/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options)
     74                                      open_with=myopen,
---> 75                                      sep=myopen.fs.sep)
     76     except:

/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep)
     51             self.fn = fn
---> 52             with open_with(fn, 'rb') as f:
     53                 self._parse_header(f, verify)

/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/dask/bytes/core.py in __enter__(self)
    313         mode = self.mode.replace('t', '').replace('b', '') + 'b'
--> 314         f = f2 = self.myopen(self.path, mode=mode)
    315         CompressFile = merge(seekable_files, compress_files)[self.compression]

/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/dask/bytes/local.py in open(self, path, mode, **kwargs)
     62             path = path[len('file://'):]
---> 63         return open(path, mode=mode)
     64 

FileNotFoundError: [Errno 2] No such file or directory: '../data/processed/2016*.parq/_metadata'

During handling of the above exception, another exception occurred:

FileNotFoundError                         Traceback (most recent call last)
/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep)
     46             self.fn = fn2
---> 47             with open_with(fn2, 'rb') as f:
     48                 self._parse_header(f, verify)

/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/dask/bytes/core.py in __enter__(self)
    313         mode = self.mode.replace('t', '').replace('b', '') + 'b'
--> 314         f = f2 = self.myopen(self.path, mode=mode)
    315         CompressFile = merge(seekable_files, compress_files)[self.compression]

/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/dask/bytes/local.py in open(self, path, mode, **kwargs)
     62             path = path[len('file://'):]
---> 63         return open(path, mode=mode)
     64 

FileNotFoundError: [Errno 2] No such file or directory: '../data/processed/2016*.parq/_metadata'

During handling of the above exception, another exception occurred:

FileNotFoundError                         Traceback (most recent call last)
<ipython-input-13-2d2b23a42644> in <module>()
----> 1 df  = ddf.read_parquet("../data/processed/2016*.parq")

/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options)
     75                                      sep=myopen.fs.sep)
     76     except:
---> 77         pf = fastparquet.ParquetFile(path, open_with=myopen, sep=myopen.fs.sep)
     78 
     79     check_column_names(pf.columns, categories)

/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep)
     50         except (IOError, OSError):
     51             self.fn = fn
---> 52             with open_with(fn, 'rb') as f:
     53                 self._parse_header(f, verify)
     54         if all(rg.columns[0].file_path is None for rg in self.row_groups):

/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/dask/bytes/core.py in __enter__(self)
    312     def __enter__(self):
    313         mode = self.mode.replace('t', '').replace('b', '') + 'b'
--> 314         f = f2 = self.myopen(self.path, mode=mode)
    315         CompressFile = merge(seekable_files, compress_files)[self.compression]
    316         if PY2:

/nfs/projects_nobackup/c/cidgrowlab/Mali/MBTA/mbtaenv/lib/python3.6/site-packages/dask/bytes/local.py in open(self, path, mode, **kwargs)
     61         if path.startswith('file://'):
     62             path = path[len('file://'):]
---> 63         return open(path, mode=mode)
     64 
     65     def ukey(self, path):

FileNotFoundError: [Errno 2] No such file or directory: '../data/processed/2016*.parq'

The fact that it’s looking for ../data/processed/2016*.parq/_metadata/_metadata suggests that there is something kinda wonky going on.

Specifying the file scheme as hive to the storage engine doesn’t seem to resolve the issue, and neither does trying different combos of wildcards and slashes. As a last ditch effort, I tried to pass a list of parquet files into read_parquet and that doesnt’ seem to be supported (even just this would be super handy).

My workaround is probably going to be to do a delayed for-loop read and concat after that, but I’m not sure how that’d perform in comparison to the other way. Regardless, it would be super nice to have this. Thank you for reading!

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 15 (8 by maintainers)

Most upvoted comments

Also, we now support globs in dataframe.read_parquet, so going to close this issue.