dask: Dataframe merge fails with KeyError

What happened:

Dask optimization fails when trying to merge two dataframes

What you expected to happen:

The two dataframes are merged

Minimal Complete Verifiable Example:

Load a reasonably-large dataset to the cluster

base_uri = "http://datasets.imdbws.com"
file_list = [
    "name.basics.tsv.gz",
    "title.basics.tsv.gz",
    "title.principals.tsv.gz",
    "title.ratings.tsv.gz"
]
for file_name in file_list:
    (dd.read_csv("/".join((base_uri, file_name)),
                 sep="\t", 
                 blocksize=None, 
                 dtype={
                     'isOriginalTitle': 'object', 
                     'isAdult': 'object',
                     'startYear': 'object'
                 }
                )
     .repartition(partition_size="100MB")
     .to_csv(f"s3://imdb-datasets/{'.'.join(file_name.split('.')[:2])}.csv.gz",
             compression="gzip",
             storage_options=storage_options())
    )

Load the files as Dask dataframes

people = dd.read_csv("s3://imdb-datasets/name.basics.csv.gz/*", 
                     compression="gzip",
                     blocksize=None,
                     storage_options=storage_options())

movies = dd.read_csv("s3://imdb-datasets/title.basics.csv.gz/*",
                     compression="gzip",
                     blocksize=None,
                     dtype={'startYear': 'object', 'isAdult': 'object'},
                     storage_options=storage_options())

ratings = dd.read_csv("s3://imdb-datasets/title.ratings.csv.gz/*",
                      compression="gzip",
                      blocksize=None,
                      storage_options=storage_options())

principals = dd.read_csv("s3://imdb-datasets/title.principals.csv.gz/*", 
                         compression="gzip",
                         blocksize=None,
                         storage_options=storage_options())[["tconst", "nconst", "category", "job"]]

Do some merges

filtered_ratings = ratings[ratings.numVotes > 1000]

relevant_movies = filtered_ratings[["tconst"]]

filtered_movies = (
    movies
    [['tconst', 'primaryTitle', 'originalTitle', 'startYear', 'runtimeMinutes']]
    .merge(relevant_movies, on='tconst', how='inner')
)

category_index = principals.category.unique().reset_index()
job_index = principals.job.unique().reset_index()

filtered_crew = (
    principals
    .merge(category_index, on='category', how='left')
    .merge(job_index, on='job', how='left', suffixes=('_category', '_job'))
    .merge(relevant_movies, on='tconst', how='inner')
    [["tconst", "nconst", "index_category", "index_job"]]
) 

And if I try to merge filtered_crew with `people``

people.merge(filtered_crew, on="nconst", how="inner").head()

I get the following error

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-6-4b2a05156842> in <module>
----> 1 people.merge(filtered_crew, on="nconst", how="inner").head()

~/.local/lib/python3.8/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
   1034             Whether to compute the result, default is True.
   1035         """
-> 1036         return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
   1037 
   1038     def _head(self, n, npartitions, compute, safe):

~/.local/lib/python3.8/site-packages/dask/dataframe/core.py in _head(self, n, npartitions, compute, safe)
   1067 
   1068         if compute:
-> 1069             result = result.compute()
   1070         return result
   1071 

~/.local/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    277         dask.base.compute
    278         """
--> 279         (result,) = compute(self, traverse=False, **kwargs)
    280         return result
    281 

~/.local/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    553     )
    554 
--> 555     dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
    556     keys, postcomputes = [], []
    557     for x in collections:

~/.local/lib/python3.8/site-packages/dask/base.py in collections_to_dsk(collections, optimize_graph, optimizations, **kwargs)
    329         for opt, val in groups.items():
    330             dsk, keys = _extract_graph_and_keys(val)
--> 331             dsk = opt(dsk, keys, **kwargs)
    332 
    333             for opt in optimizations:

~/.local/lib/python3.8/site-packages/dask/dataframe/optimize.py in optimize(dsk, keys, **kwargs)
     32     if fuse_subgraphs is None:
     33         fuse_subgraphs = True
---> 34     dsk, _ = fuse(
     35         dsk,
     36         keys,

~/.local/lib/python3.8/site-packages/dask/optimization.py in fuse(dsk, keys, dependencies, ave_width, max_width, max_height, max_depth_new_edges, rename_keys, fuse_subgraphs)
    598             child = children_stack[-1]
    599             if child != parent:
--> 600                 children = reducible & deps[child]
    601                 while children:
    602                     # Depth-first search

KeyError: ('assign-6806e9502a8527bf71183e8bcc82e738', 48)

Anything else we need to know?:

I can get the same result successfully by not using the filtered_crew dataframe

relevant_people = principals.merge(relevant_movies, on="tconst", how='inner')[['nconst']].drop_duplicates()
filtered_people = people.merge(relevant_people, on="nconst", how="inner")
filtered_people.head()

Environment:

  • Dask version: 2021.1.0
  • Python version: 3.8.0
  • Operating System: Linux
  • Install method (conda, pip, source): Dask helm chart

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 19 (11 by maintainers)

Most upvoted comments

Thanks for raising and working out a reproducer @guillemborrell and @rubenvdg (and thanks @jsignell for the ping).

As far as I can tell, the error is from a bug in ShuffleLayer._cull_dependencies (at some point a change was made in graph construction but not in the dependency-culling logic). While investigating, I did find a Blockwise bug, but it was unrelated to this issue. Please do let me know if #7213 resolves this issue.

These changes fix the issue for me. Thanks @rjzamora!

Whoops, it is now : )

Could you confirm that

ddf1 = (
    dd.read_csv(
        "http://datasets.imdbws.com/name.basics.tsv.gz",
         sep="\t", 
         blocksize=None, 
         dtype={'isOriginalTitle': 'object', 'isAdult': 'object', 'startYear': 'object'}
    )
    .repartition(partition_size="100MB")
)

ddf2 = (
    dd.read_csv(
        "http://datasets.imdbws.com/title.principals.tsv.gz",
         sep="\t", 
         blocksize=None, 
         dtype={'isOriginalTitle': 'object', 'isAdult': 'object', 'startYear': 'object'}
    )
    .repartition(partition_size="100MB")
)

ddf1.merge(ddf2, on="nconst", how="inner").head()

reproduces your problem as well (at least here it does). Please try to make your minimum verifiable example as minimal as possible.