dask: Applying aggregations to a dataframe fails when reading data from partitioned parquet

This code is a toy example of aggregating a dataframe and producing multiple aggregated columns. It works (thanks to @martindurant ).

This code is an attempt to do the same aggregation on a dataframe being read from parquet files. This code does not work, and I can’t figure out why. Exception looks like this:

aggregate2.py:25: UserWarning: `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  ag = gb.apply(lambda d: pd.DataFrame({
Traceback (most recent call last):
  File "aggregate2.py", line 30, in <module>
    print(ag.compute())
  File "/Users/irina/.pyenv/versions/talks/src/dask/dask/base.py", line 99, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/irina/.pyenv/versions/talks/src/dask/dask/base.py", line 206, in compute
    results = get(dsk, keys, **kwargs)
  File "/Users/irina/.pyenv/versions/talks/src/dask/dask/threaded.py", line 75, in get
    pack_exception=pack_exception, **kwargs)
  File "/Users/irina/.pyenv/versions/talks/src/dask/dask/local.py", line 521, in get_async
    raise_exception(exc, tb)
  File "/Users/irina/.pyenv/versions/talks/src/dask/dask/local.py", line 290, in execute_task
    result = _execute_task(task, data)
  File "/Users/irina/.pyenv/versions/talks/src/dask/dask/local.py", line 271, in _execute_task
    return func(*args2)
  File "/Users/irina/.pyenv/versions/talks/src/dask/dask/dataframe/core.py", line 3194, in apply_and_enforce
    return _rename(c, df)
  File "/Users/irina/.pyenv/versions/talks/src/dask/dask/dataframe/core.py", line 3231, in _rename
    df.columns = columns
  File "/Users/irina/.pyenv/versions/talks/lib/python2.7/site-packages/pandas/core/generic.py", line 3094, in __setattr__
    return object.__setattr__(self, name, value)
  File "pandas/_libs/src/properties.pyx", line 65, in pandas._libs.lib.AxisProperty.__set__ (pandas/_libs/lib.c:45255)
  File "/Users/irina/.pyenv/versions/talks/lib/python2.7/site-packages/pandas/core/generic.py", line 473, in _set_axis
    self._data.set_axis(axis, labels)
  File "/Users/irina/.pyenv/versions/talks/lib/python2.7/site-packages/pandas/core/internals.py", line 2836, in set_axis
    (old_len, new_len))
ValueError: Length mismatch: Expected axis has 4 elements, new values have 7 elements

I also noticed a few odd things about the loaded parquet. Originally, it was partitioned on year, month, day, hour and customer. After reading the files, even when specifying columns=['customer', 'url', 'ts', 'session_id', 'referrer'], I have hour in the data if I look at df.head(). In fact, specifying columns=['customer', 'url', 'ts', 'session_id', 'referrer'] in dd.read_parquet does not seem to work, the dataframe looks like this:

ipdb> df.head()
                       url              referrer session_id                  ts customer hour
0  http://a.com/articles/1    http://google.com/        xxx 2017-09-15 00:15:00    a.com    0
1  http://a.com/articles/2      http://bing.com/        yyy 2017-09-15 00:30:00    a.com    0
2  http://a.com/articles/2  http://facebook.com/        yyy 2017-09-15 00:45:00    a.com    0

with year, month, day not present in dataframe, while customer and hour are. I would assume all partition keys should be either read or dropped together, but this seem to happen selectively.

I attached the parquet data in question.

events.zip

This issue came here from https://stackoverflow.com/questions/46375382/aggregate-a-dask-dataframe-and-produce-a-dataframe-of-aggregates/46380632#46380632.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 28 (28 by maintainers)

Most upvoted comments

Sorry, I meant subsetting after this line

df = dd.from_delayed(dfs)

We would accept that those columns will show up, but after they show up we would get rid of them. This is somewhat inefficient because we allocate memory for columns that we then release, but would, I think, be relatively fool-proof.