dask: read_parquet + getitem optimization no longer working
The optimization introduced in #1785 doesn’t work in master.
import dask
import dask.dataframe as dd
import pandas as pd
# Create test dataset
df = pd.DataFrame({'a': [1] * 10000, 'b': [2] * 10000})
df.to_parquet('test.parquet', engine='fastparquet')
# Open it, get a subset of items and try to optimize the graph
ddi = dd.read_parquet('test.parquet', engine='fastparquet')[['a']]
dsk = ddi.dask
dsk2 = dd.optimize(dsk, ddi.__dask_keys__())
dsk2[('read-parquet-getitem-e42c1003708b81c7d6bfcbf6eaa5ac3d', 0)]
returns
(subgraph_callable, (<function _read_parquet_row_group>,..., ['a', 'b'], ...), ['a'])
whereas in the past we were seeing:
(<function _read_parquet_row_group>,..., ['a'], ...)
The problem seems to be in dask.optimization.fuse_selections not handling subgraph_callable. I see 2 possible solutions:
- Convert read_parquet into a HighLevelGraph, so that the optimization can be performed between 2 layers, instead of among tasks
- Modify fuse_selections to handle subgraph_callable
What would you suggest? I could find some time to work on it. Thank you for your help!
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 15 (7 by maintainers)
Yes.
There really isn’t any standard way to do this today. You’re in very new territory for all of us. Really, the only constraint is that you make sure that whatever you do works well with how
DataFrame.__getitem__operates. I could imagine making a subclass of Blockwise that was specific for getitem and testing for that, or, as you suggest just testing the value of the callable within the Blockwise call.We don’t have any best practices around optimizations yet, so as long as what you do is decently simple it will be interesting to see. If you complete this work then it would also be good to get your thoughts on the experience, and what we might do to design things to make optimizations like this more effective in the future.