dask: Dataframe merge fails with KeyError
What happened:
Dask optimization fails when trying to merge two dataframes
What you expected to happen:
The two dataframes are merged
Minimal Complete Verifiable Example:
Load a reasonably-large dataset to the cluster
base_uri = "http://datasets.imdbws.com"
file_list = [
"name.basics.tsv.gz",
"title.basics.tsv.gz",
"title.principals.tsv.gz",
"title.ratings.tsv.gz"
]
for file_name in file_list:
(dd.read_csv("/".join((base_uri, file_name)),
sep="\t",
blocksize=None,
dtype={
'isOriginalTitle': 'object',
'isAdult': 'object',
'startYear': 'object'
}
)
.repartition(partition_size="100MB")
.to_csv(f"s3://imdb-datasets/{'.'.join(file_name.split('.')[:2])}.csv.gz",
compression="gzip",
storage_options=storage_options())
)
Load the files as Dask dataframes
people = dd.read_csv("s3://imdb-datasets/name.basics.csv.gz/*",
compression="gzip",
blocksize=None,
storage_options=storage_options())
movies = dd.read_csv("s3://imdb-datasets/title.basics.csv.gz/*",
compression="gzip",
blocksize=None,
dtype={'startYear': 'object', 'isAdult': 'object'},
storage_options=storage_options())
ratings = dd.read_csv("s3://imdb-datasets/title.ratings.csv.gz/*",
compression="gzip",
blocksize=None,
storage_options=storage_options())
principals = dd.read_csv("s3://imdb-datasets/title.principals.csv.gz/*",
compression="gzip",
blocksize=None,
storage_options=storage_options())[["tconst", "nconst", "category", "job"]]
Do some merges
filtered_ratings = ratings[ratings.numVotes > 1000]
relevant_movies = filtered_ratings[["tconst"]]
filtered_movies = (
movies
[['tconst', 'primaryTitle', 'originalTitle', 'startYear', 'runtimeMinutes']]
.merge(relevant_movies, on='tconst', how='inner')
)
category_index = principals.category.unique().reset_index()
job_index = principals.job.unique().reset_index()
filtered_crew = (
principals
.merge(category_index, on='category', how='left')
.merge(job_index, on='job', how='left', suffixes=('_category', '_job'))
.merge(relevant_movies, on='tconst', how='inner')
[["tconst", "nconst", "index_category", "index_job"]]
)
And if I try to merge filtered_crew
with `people``
people.merge(filtered_crew, on="nconst", how="inner").head()
I get the following error
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-6-4b2a05156842> in <module>
----> 1 people.merge(filtered_crew, on="nconst", how="inner").head()
~/.local/lib/python3.8/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
1034 Whether to compute the result, default is True.
1035 """
-> 1036 return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
1037
1038 def _head(self, n, npartitions, compute, safe):
~/.local/lib/python3.8/site-packages/dask/dataframe/core.py in _head(self, n, npartitions, compute, safe)
1067
1068 if compute:
-> 1069 result = result.compute()
1070 return result
1071
~/.local/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
277 dask.base.compute
278 """
--> 279 (result,) = compute(self, traverse=False, **kwargs)
280 return result
281
~/.local/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
553 )
554
--> 555 dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
556 keys, postcomputes = [], []
557 for x in collections:
~/.local/lib/python3.8/site-packages/dask/base.py in collections_to_dsk(collections, optimize_graph, optimizations, **kwargs)
329 for opt, val in groups.items():
330 dsk, keys = _extract_graph_and_keys(val)
--> 331 dsk = opt(dsk, keys, **kwargs)
332
333 for opt in optimizations:
~/.local/lib/python3.8/site-packages/dask/dataframe/optimize.py in optimize(dsk, keys, **kwargs)
32 if fuse_subgraphs is None:
33 fuse_subgraphs = True
---> 34 dsk, _ = fuse(
35 dsk,
36 keys,
~/.local/lib/python3.8/site-packages/dask/optimization.py in fuse(dsk, keys, dependencies, ave_width, max_width, max_height, max_depth_new_edges, rename_keys, fuse_subgraphs)
598 child = children_stack[-1]
599 if child != parent:
--> 600 children = reducible & deps[child]
601 while children:
602 # Depth-first search
KeyError: ('assign-6806e9502a8527bf71183e8bcc82e738', 48)
Anything else we need to know?:
I can get the same result successfully by not using the filtered_crew
dataframe
relevant_people = principals.merge(relevant_movies, on="tconst", how='inner')[['nconst']].drop_duplicates()
filtered_people = people.merge(relevant_people, on="nconst", how="inner")
filtered_people.head()
Environment:
- Dask version: 2021.1.0
- Python version: 3.8.0
- Operating System: Linux
- Install method (conda, pip, source): Dask helm chart
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 19 (11 by maintainers)
Thanks for raising and working out a reproducer @guillemborrell and @rubenvdg (and thanks @jsignell for the ping).
As far as I can tell, the error is from a bug in
ShuffleLayer._cull_dependencies
(at some point a change was made in graph construction but not in the dependency-culling logic). While investigating, I did find aBlockwise
bug, but it was unrelated to this issue. Please do let me know if #7213 resolves this issue.These changes fix the issue for me. Thanks @rjzamora!
Whoops, it is now : )
Could you confirm that
reproduces your problem as well (at least here it does). Please try to make your minimum verifiable example as minimal as possible.